diff --git a/.chloggen/connectors-propogate-mutates-data.yaml b/.chloggen/connectors-propogate-mutates-data.yaml new file mode 100755 index 00000000000..42e6b082058 --- /dev/null +++ b/.chloggen/connectors-propogate-mutates-data.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: service + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix bug where MutatesData would not correctly propagate through connectors. + +# One or more tracking issues or pull requests related to the change +issues: [9053] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] \ No newline at end of file diff --git a/internal/fanoutconsumer/logs.go b/internal/fanoutconsumer/logs.go index 2d1e0336dc0..fb2cf90aa0b 100644 --- a/internal/fanoutconsumer/logs.go +++ b/internal/fanoutconsumer/logs.go @@ -44,7 +44,7 @@ type logsConsumer struct { } func (lsc *logsConsumer) Capabilities() consumer.Capabilities { - return consumer.Capabilities{MutatesData: false} + return consumer.Capabilities{MutatesData: len(lsc.mutable) > 0} } // ConsumeLogs exports the plog.Logs to all consumers wrapped by the current one. diff --git a/internal/fanoutconsumer/logs_test.go b/internal/fanoutconsumer/logs_test.go index c2cd5c42bd9..aedbea28e91 100644 --- a/internal/fanoutconsumer/logs_test.go +++ b/internal/fanoutconsumer/logs_test.go @@ -26,6 +26,12 @@ func TestLogsNotMultiplexing(t *testing.T) { assert.Same(t, nop, lfc) } +func TestLogsNotMultiplexingMutating(t *testing.T) { + p := &mutatingLogsSink{LogsSink: new(consumertest.LogsSink)} + lfc := NewLogs([]consumer.Logs{p}) + assert.True(t, lfc.Capabilities().MutatesData) +} + func TestLogsMultiplexingNonMutating(t *testing.T) { p1 := new(consumertest.LogsSink) p2 := new(consumertest.LogsSink) @@ -68,7 +74,7 @@ func TestLogsMultiplexingMutating(t *testing.T) { p3 := &mutatingLogsSink{LogsSink: new(consumertest.LogsSink)} lfc := NewLogs([]consumer.Logs{p1, p2, p3}) - assert.False(t, lfc.Capabilities().MutatesData) + assert.True(t, lfc.Capabilities().MutatesData) ld := testdata.GenerateLogs(1) for i := 0; i < 2; i++ { @@ -105,7 +111,7 @@ func TestReadOnlyLogsMultiplexingMutating(t *testing.T) { p3 := &mutatingLogsSink{LogsSink: new(consumertest.LogsSink)} lfc := NewLogs([]consumer.Logs{p1, p2, p3}) - assert.False(t, lfc.Capabilities().MutatesData) + assert.True(t, lfc.Capabilities().MutatesData) ldOrig := testdata.GenerateLogs(1) ld := testdata.GenerateLogs(1) ld.MarkReadOnly() @@ -142,7 +148,7 @@ func TestLogsMultiplexingMixLastMutating(t *testing.T) { p3 := &mutatingLogsSink{LogsSink: new(consumertest.LogsSink)} lfc := NewLogs([]consumer.Logs{p1, p2, p3}) - assert.False(t, lfc.Capabilities().MutatesData) + assert.True(t, lfc.Capabilities().MutatesData) ld := testdata.GenerateLogs(1) for i := 0; i < 2; i++ { @@ -180,7 +186,7 @@ func TestLogsMultiplexingMixLastNonMutating(t *testing.T) { p3 := new(consumertest.LogsSink) lfc := NewLogs([]consumer.Logs{p1, p2, p3}) - assert.False(t, lfc.Capabilities().MutatesData) + assert.True(t, lfc.Capabilities().MutatesData) ld := testdata.GenerateLogs(1) for i := 0; i < 2; i++ { diff --git a/internal/fanoutconsumer/metrics.go b/internal/fanoutconsumer/metrics.go index ddc3761240b..039830abf47 100644 --- a/internal/fanoutconsumer/metrics.go +++ b/internal/fanoutconsumer/metrics.go @@ -42,7 +42,7 @@ type metricsConsumer struct { } func (msc *metricsConsumer) Capabilities() consumer.Capabilities { - return consumer.Capabilities{MutatesData: false} + return consumer.Capabilities{MutatesData: len(msc.mutable) > 0} } // ConsumeMetrics exports the pmetric.Metrics to all consumers wrapped by the current one. diff --git a/internal/fanoutconsumer/metrics_test.go b/internal/fanoutconsumer/metrics_test.go index a58a7480c92..37fde6ad52a 100644 --- a/internal/fanoutconsumer/metrics_test.go +++ b/internal/fanoutconsumer/metrics_test.go @@ -26,6 +26,12 @@ func TestMetricsNotMultiplexing(t *testing.T) { assert.Same(t, nop, mfc) } +func TestMetricssNotMultiplexingMutating(t *testing.T) { + p := &mutatingMetricsSink{MetricsSink: new(consumertest.MetricsSink)} + lfc := NewMetrics([]consumer.Metrics{p}) + assert.True(t, lfc.Capabilities().MutatesData) +} + func TestMetricsMultiplexingNonMutating(t *testing.T) { p1 := new(consumertest.MetricsSink) p2 := new(consumertest.MetricsSink) @@ -68,7 +74,7 @@ func TestMetricsMultiplexingMutating(t *testing.T) { p3 := &mutatingMetricsSink{MetricsSink: new(consumertest.MetricsSink)} mfc := NewMetrics([]consumer.Metrics{p1, p2, p3}) - assert.False(t, mfc.Capabilities().MutatesData) + assert.True(t, mfc.Capabilities().MutatesData) md := testdata.GenerateMetrics(1) for i := 0; i < 2; i++ { @@ -105,7 +111,7 @@ func TestReadOnlyMetricsMultiplexingMixFirstMutating(t *testing.T) { p3 := &mutatingMetricsSink{MetricsSink: new(consumertest.MetricsSink)} mfc := NewMetrics([]consumer.Metrics{p1, p2, p3}) - assert.False(t, mfc.Capabilities().MutatesData) + assert.True(t, mfc.Capabilities().MutatesData) mdOrig := testdata.GenerateMetrics(1) md := testdata.GenerateMetrics(1) md.MarkReadOnly() @@ -142,7 +148,7 @@ func TestMetricsMultiplexingMixLastMutating(t *testing.T) { p3 := &mutatingMetricsSink{MetricsSink: new(consumertest.MetricsSink)} mfc := NewMetrics([]consumer.Metrics{p1, p2, p3}) - assert.False(t, mfc.Capabilities().MutatesData) + assert.True(t, mfc.Capabilities().MutatesData) md := testdata.GenerateMetrics(1) for i := 0; i < 2; i++ { @@ -180,7 +186,7 @@ func TestMetricsMultiplexingMixLastNonMutating(t *testing.T) { p3 := new(consumertest.MetricsSink) mfc := NewMetrics([]consumer.Metrics{p1, p2, p3}) - assert.False(t, mfc.Capabilities().MutatesData) + assert.True(t, mfc.Capabilities().MutatesData) md := testdata.GenerateMetrics(1) for i := 0; i < 2; i++ { diff --git a/internal/fanoutconsumer/traces.go b/internal/fanoutconsumer/traces.go index c8d0871d0a2..69dac49ddca 100644 --- a/internal/fanoutconsumer/traces.go +++ b/internal/fanoutconsumer/traces.go @@ -42,7 +42,7 @@ type tracesConsumer struct { } func (tsc *tracesConsumer) Capabilities() consumer.Capabilities { - return consumer.Capabilities{MutatesData: false} + return consumer.Capabilities{MutatesData: len(tsc.mutable) > 0} } // ConsumeTraces exports the ptrace.Traces to all consumers wrapped by the current one. diff --git a/internal/fanoutconsumer/traces_test.go b/internal/fanoutconsumer/traces_test.go index 872f4986ef2..f04084882dc 100644 --- a/internal/fanoutconsumer/traces_test.go +++ b/internal/fanoutconsumer/traces_test.go @@ -26,6 +26,12 @@ func TestTracesNotMultiplexing(t *testing.T) { assert.Same(t, nop, tfc) } +func TestTracesNotMultiplexingMutating(t *testing.T) { + p := &mutatingTracesSink{TracesSink: new(consumertest.TracesSink)} + lfc := NewTraces([]consumer.Traces{p}) + assert.True(t, lfc.Capabilities().MutatesData) +} + func TestTracesMultiplexingNonMutating(t *testing.T) { p1 := new(consumertest.TracesSink) p2 := new(consumertest.TracesSink) @@ -68,7 +74,7 @@ func TestTracesMultiplexingMutating(t *testing.T) { p3 := &mutatingTracesSink{TracesSink: new(consumertest.TracesSink)} tfc := NewTraces([]consumer.Traces{p1, p2, p3}) - assert.False(t, tfc.Capabilities().MutatesData) + assert.True(t, tfc.Capabilities().MutatesData) td := testdata.GenerateTraces(1) for i := 0; i < 2; i++ { @@ -105,7 +111,7 @@ func TestReadOnlyTracesMultiplexingMutating(t *testing.T) { p3 := &mutatingTracesSink{TracesSink: new(consumertest.TracesSink)} tfc := NewTraces([]consumer.Traces{p1, p2, p3}) - assert.False(t, tfc.Capabilities().MutatesData) + assert.True(t, tfc.Capabilities().MutatesData) tdOrig := testdata.GenerateTraces(1) td := testdata.GenerateTraces(1) @@ -143,7 +149,7 @@ func TestTracesMultiplexingMixLastMutating(t *testing.T) { p3 := &mutatingTracesSink{TracesSink: new(consumertest.TracesSink)} tfc := NewTraces([]consumer.Traces{p1, p2, p3}) - assert.False(t, tfc.Capabilities().MutatesData) + assert.True(t, tfc.Capabilities().MutatesData) td := testdata.GenerateTraces(1) for i := 0; i < 2; i++ { @@ -181,7 +187,7 @@ func TestTracesMultiplexingMixLastNonMutating(t *testing.T) { p3 := new(consumertest.TracesSink) tfc := NewTraces([]consumer.Traces{p1, p2, p3}) - assert.False(t, tfc.Capabilities().MutatesData) + assert.True(t, tfc.Capabilities().MutatesData) td := testdata.GenerateTraces(1) for i := 0; i < 2; i++ { diff --git a/service/internal/graph/graph.go b/service/internal/graph/graph.go index 1c3c5392bde..9e69eab3cf6 100644 --- a/service/internal/graph/graph.go +++ b/service/internal/graph/graph.go @@ -288,7 +288,10 @@ func (g *Graph) buildComponents(ctx context.Context, set Settings) error { case *connectorNode: err = n.buildComponent(ctx, telemetrySettings, set.BuildInfo, set.ConnectorBuilder, g.nextConsumers(n.ID())) case *capabilitiesNode: - capability := consumer.Capabilities{MutatesData: false} + capability := consumer.Capabilities{ + // The fanOutNode represents the aggregate capabilities of the exporters in the pipeline. + MutatesData: g.pipelines[n.pipelineID].fanOutNode.getConsumer().Capabilities().MutatesData, + } for _, proc := range g.pipelines[n.pipelineID].processors { capability.MutatesData = capability.MutatesData || proc.getConsumer().Capabilities().MutatesData } @@ -319,7 +322,6 @@ func (g *Graph) buildComponents(ctx context.Context, set Settings) error { case component.DataTypeMetrics: consumers := make([]consumer.Metrics, 0, len(nexts)) for _, next := range nexts { - consumers = append(consumers, next.(consumer.Metrics)) } n.baseConsumer = fanoutconsumer.NewMetrics(consumers) diff --git a/service/internal/graph/graph_test.go b/service/internal/graph/graph_test.go index b198d3b81e6..cea340a2f51 100644 --- a/service/internal/graph/graph_test.go +++ b/service/internal/graph/graph_test.go @@ -7,6 +7,7 @@ import ( "context" "errors" "fmt" + "strings" "sync" "testing" @@ -384,11 +385,11 @@ func TestConnectorPipelinesGraph(t *testing.T) { component.NewIDWithName("traces", "in"): { Receivers: []component.ID{component.NewID("examplereceiver")}, Processors: []component.ID{component.NewID("exampleprocessor")}, - Exporters: []component.ID{component.NewID("exampleconnector")}, // wrapped w/ mutates: true + Exporters: []component.ID{component.NewIDWithName("exampleconnector", "inherit_mutate")}, }, component.NewIDWithName("traces", "out"): { - Receivers: []component.ID{component.NewID("exampleconnector")}, - Processors: []component.ID{component.NewIDWithName("exampleprocessor", "mutate")}, + Receivers: []component.ID{component.NewIDWithName("exampleconnector", "inherit_mutate")}, + Processors: []component.ID{component.NewIDWithName("exampleprocessor", "mutate")}, // mutate propagates upstream to connector Exporters: []component.ID{component.NewID("exampleexporter")}, }, }, @@ -400,11 +401,11 @@ func TestConnectorPipelinesGraph(t *testing.T) { component.NewIDWithName("metrics", "in"): { Receivers: []component.ID{component.NewID("examplereceiver")}, Processors: []component.ID{component.NewID("exampleprocessor")}, - Exporters: []component.ID{component.NewID("exampleconnector")}, // wrapped w/ mutates: true + Exporters: []component.ID{component.NewIDWithName("exampleconnector", "inherit_mutate")}, }, component.NewIDWithName("metrics", "out"): { - Receivers: []component.ID{component.NewID("exampleconnector")}, - Processors: []component.ID{component.NewIDWithName("exampleprocessor", "mutate")}, + Receivers: []component.ID{component.NewIDWithName("exampleconnector", "inherit_mutate")}, + Processors: []component.ID{component.NewIDWithName("exampleprocessor", "mutate")}, // mutate propagates upstream to connector Exporters: []component.ID{component.NewID("exampleexporter")}, }, }, @@ -416,11 +417,11 @@ func TestConnectorPipelinesGraph(t *testing.T) { component.NewIDWithName("logs", "in"): { Receivers: []component.ID{component.NewID("examplereceiver")}, Processors: []component.ID{component.NewID("exampleprocessor")}, - Exporters: []component.ID{component.NewID("exampleconnector")}, // wrapped w/ mutates: true + Exporters: []component.ID{component.NewIDWithName("exampleconnector", "inherit_mutate")}, }, component.NewIDWithName("logs", "out"): { - Receivers: []component.ID{component.NewID("exampleconnector")}, - Processors: []component.ID{component.NewIDWithName("exampleprocessor", "mutate")}, + Receivers: []component.ID{component.NewIDWithName("exampleconnector", "inherit_mutate")}, + Processors: []component.ID{component.NewIDWithName("exampleprocessor", "mutate")}, // mutate propagates upstream to connector Exporters: []component.ID{component.NewID("exampleexporter")}, }, }, @@ -432,16 +433,16 @@ func TestConnectorPipelinesGraph(t *testing.T) { component.NewIDWithName("traces", "in"): { Receivers: []component.ID{component.NewID("examplereceiver")}, Processors: []component.ID{component.NewID("exampleprocessor")}, - Exporters: []component.ID{component.NewIDWithName("exampleconnector", "fork")}, // wrapped w/ mutates: true + Exporters: []component.ID{component.NewIDWithName("exampleconnector", "inherit_mutate")}, }, component.NewIDWithName("traces", "type0"): { - Receivers: []component.ID{component.NewIDWithName("exampleconnector", "fork")}, + Receivers: []component.ID{component.NewIDWithName("exampleconnector", "inherit_mutate")}, Processors: []component.ID{component.NewID("exampleprocessor")}, Exporters: []component.ID{component.NewIDWithName("exampleconnector", "merge")}, }, component.NewIDWithName("traces", "type1"): { - Receivers: []component.ID{component.NewIDWithName("exampleconnector", "fork")}, - Processors: []component.ID{component.NewIDWithName("exampleprocessor", "mutate")}, + Receivers: []component.ID{component.NewIDWithName("exampleconnector", "inherit_mutate")}, + Processors: []component.ID{component.NewIDWithName("exampleprocessor", "mutate")}, // mutate propagates upstream to connector Exporters: []component.ID{component.NewIDWithName("exampleconnector", "merge")}, }, component.NewIDWithName("traces", "out"): { @@ -458,16 +459,16 @@ func TestConnectorPipelinesGraph(t *testing.T) { component.NewIDWithName("metrics", "in"): { Receivers: []component.ID{component.NewID("examplereceiver")}, Processors: []component.ID{component.NewID("exampleprocessor")}, - Exporters: []component.ID{component.NewIDWithName("exampleconnector", "fork")}, // wrapped w/ mutates: true + Exporters: []component.ID{component.NewIDWithName("exampleconnector", "inherit_mutate")}, }, component.NewIDWithName("metrics", "type0"): { - Receivers: []component.ID{component.NewIDWithName("exampleconnector", "fork")}, + Receivers: []component.ID{component.NewIDWithName("exampleconnector", "inherit_mutate")}, Processors: []component.ID{component.NewID("exampleprocessor")}, Exporters: []component.ID{component.NewIDWithName("exampleconnector", "merge")}, }, component.NewIDWithName("metrics", "type1"): { - Receivers: []component.ID{component.NewIDWithName("exampleconnector", "fork")}, - Processors: []component.ID{component.NewIDWithName("exampleprocessor", "mutate")}, + Receivers: []component.ID{component.NewIDWithName("exampleconnector", "inherit_mutate")}, + Processors: []component.ID{component.NewIDWithName("exampleprocessor", "mutate")}, // mutate propagates upstream to connector Exporters: []component.ID{component.NewIDWithName("exampleconnector", "merge")}, }, component.NewIDWithName("metrics", "out"): { @@ -484,16 +485,16 @@ func TestConnectorPipelinesGraph(t *testing.T) { component.NewIDWithName("logs", "in"): { Receivers: []component.ID{component.NewID("examplereceiver")}, Processors: []component.ID{component.NewID("exampleprocessor")}, - Exporters: []component.ID{component.NewIDWithName("exampleconnector", "fork")}, // wrapped w/ mutates: true + Exporters: []component.ID{component.NewIDWithName("exampleconnector", "inherit_mutate")}, }, component.NewIDWithName("logs", "type0"): { - Receivers: []component.ID{component.NewIDWithName("exampleconnector", "fork")}, + Receivers: []component.ID{component.NewIDWithName("exampleconnector", "inherit_mutate")}, Processors: []component.ID{component.NewID("exampleprocessor")}, Exporters: []component.ID{component.NewIDWithName("exampleconnector", "merge")}, }, component.NewIDWithName("logs", "type1"): { - Receivers: []component.ID{component.NewIDWithName("exampleconnector", "fork")}, - Processors: []component.ID{component.NewIDWithName("exampleprocessor", "mutate")}, + Receivers: []component.ID{component.NewIDWithName("exampleconnector", "inherit_mutate")}, + Processors: []component.ID{component.NewIDWithName("exampleprocessor", "mutate")}, // mutate propagates upstream to connector Exporters: []component.ID{component.NewIDWithName("exampleconnector", "merge")}, }, component.NewIDWithName("logs", "out"): { @@ -573,30 +574,30 @@ func TestConnectorPipelinesGraph(t *testing.T) { component.NewIDWithName("traces", "in"): { Receivers: []component.ID{component.NewID("examplereceiver")}, Processors: []component.ID{component.NewID("exampleprocessor")}, - Exporters: []component.ID{component.NewID("exampleconnector")}, // wrapped w/ mutates: true + Exporters: []component.ID{component.NewIDWithName("exampleconnector", "inherit_mutate")}, }, component.NewIDWithName("metrics", "in"): { Receivers: []component.ID{component.NewID("examplereceiver")}, - Processors: []component.ID{component.NewIDWithName("exampleprocessor", "mutate")}, - Exporters: []component.ID{component.NewID("exampleconnector")}, + Processors: []component.ID{component.NewIDWithName("exampleprocessor", "mutate")}, // mutate propagates upstream to connector + Exporters: []component.ID{component.NewIDWithName("exampleconnector", "inherit_mutate")}, }, component.NewIDWithName("logs", "in"): { Receivers: []component.ID{component.NewID("examplereceiver")}, Processors: []component.ID{component.NewID("exampleprocessor")}, - Exporters: []component.ID{component.NewID("exampleconnector")}, // wrapped w/ mutates: true + Exporters: []component.ID{component.NewIDWithName("exampleconnector", "inherit_mutate")}, }, component.NewIDWithName("traces", "out"): { - Receivers: []component.ID{component.NewID("exampleconnector")}, - Processors: []component.ID{component.NewIDWithName("exampleprocessor", "mutate")}, + Receivers: []component.ID{component.NewIDWithName("exampleconnector", "inherit_mutate")}, + Processors: []component.ID{component.NewIDWithName("exampleprocessor", "mutate")}, // mutate propagates upstream to connector Exporters: []component.ID{component.NewID("exampleexporter")}, }, component.NewIDWithName("metrics", "out"): { - Receivers: []component.ID{component.NewID("exampleconnector")}, + Receivers: []component.ID{component.NewIDWithName("exampleconnector", "inherit_mutate")}, Processors: []component.ID{component.NewID("exampleprocessor")}, Exporters: []component.ID{component.NewID("exampleexporter")}, }, component.NewIDWithName("logs", "out"): { - Receivers: []component.ID{component.NewID("exampleconnector")}, + Receivers: []component.ID{component.NewIDWithName("exampleconnector", "inherit_mutate")}, Processors: []component.ID{component.NewIDWithName("exampleprocessor", "mutate")}, Exporters: []component.ID{component.NewID("exampleexporter")}, }, @@ -633,6 +634,84 @@ func TestConnectorPipelinesGraph(t *testing.T) { }, expectedPerExporter: 1, }, + { + name: "pipelines_conn_mutate_traces.yaml", + pipelineConfigs: pipelines.Config{ + component.NewIDWithName("traces", "in"): { + Receivers: []component.ID{component.NewID("examplereceiver")}, + Processors: []component.ID{component.NewID("exampleprocessor")}, + Exporters: []component.ID{component.NewIDWithName("exampleconnector", "inherit_mutate")}, + }, + component.NewIDWithName("traces", "out0"): { + Receivers: []component.ID{component.NewIDWithName("exampleconnector", "inherit_mutate")}, + Processors: []component.ID{component.NewID("exampleprocessor")}, + Exporters: []component.ID{component.NewID("exampleexporter")}, + }, + component.NewIDWithName("traces", "middle"): { + Receivers: []component.ID{component.NewIDWithName("exampleconnector", "inherit_mutate")}, + Processors: []component.ID{component.NewID("exampleprocessor")}, + Exporters: []component.ID{component.NewIDWithName("exampleconnector", "mutate")}, + }, + component.NewIDWithName("traces", "out1"): { + Receivers: []component.ID{component.NewIDWithName("exampleconnector", "mutate")}, + Processors: []component.ID{component.NewID("exampleprocessor")}, + Exporters: []component.ID{component.NewID("exampleexporter")}, + }, + }, + expectedPerExporter: 2, + }, + { + name: "pipelines_conn_mutate_metrics.yaml", + pipelineConfigs: pipelines.Config{ + component.NewIDWithName("metrics", "in"): { + Receivers: []component.ID{component.NewID("examplereceiver")}, + Processors: []component.ID{component.NewID("exampleprocessor")}, + Exporters: []component.ID{component.NewIDWithName("exampleconnector", "inherit_mutate")}, + }, + component.NewIDWithName("metrics", "out0"): { + Receivers: []component.ID{component.NewIDWithName("exampleconnector", "inherit_mutate")}, + Processors: []component.ID{component.NewID("exampleprocessor")}, + Exporters: []component.ID{component.NewID("exampleexporter")}, + }, + component.NewIDWithName("metrics", "middle"): { + Receivers: []component.ID{component.NewIDWithName("exampleconnector", "inherit_mutate")}, + Processors: []component.ID{component.NewID("exampleprocessor")}, + Exporters: []component.ID{component.NewIDWithName("exampleconnector", "mutate")}, + }, + component.NewIDWithName("metrics", "out1"): { + Receivers: []component.ID{component.NewIDWithName("exampleconnector", "mutate")}, + Processors: []component.ID{component.NewID("exampleprocessor")}, + Exporters: []component.ID{component.NewID("exampleexporter")}, + }, + }, + expectedPerExporter: 2, + }, + { + name: "pipelines_conn_mutate_logs.yaml", + pipelineConfigs: pipelines.Config{ + component.NewIDWithName("logs", "in"): { + Receivers: []component.ID{component.NewID("examplereceiver")}, + Processors: []component.ID{component.NewID("exampleprocessor")}, + Exporters: []component.ID{component.NewIDWithName("exampleconnector", "inherit_mutate")}, + }, + component.NewIDWithName("logs", "out0"): { + Receivers: []component.ID{component.NewIDWithName("exampleconnector", "inherit_mutate")}, + Processors: []component.ID{component.NewID("exampleprocessor")}, + Exporters: []component.ID{component.NewID("exampleexporter")}, + }, + component.NewIDWithName("logs", "middle"): { + Receivers: []component.ID{component.NewIDWithName("exampleconnector", "inherit_mutate")}, + Processors: []component.ID{component.NewID("exampleprocessor")}, + Exporters: []component.ID{component.NewIDWithName("exampleconnector", "mutate")}, + }, + component.NewIDWithName("logs", "out1"): { + Receivers: []component.ID{component.NewIDWithName("exampleconnector", "mutate")}, + Processors: []component.ID{component.NewID("exampleprocessor")}, + Exporters: []component.ID{component.NewID("exampleexporter")}, + }, + }, + expectedPerExporter: 2, + }, } for _, test := range tests { @@ -670,10 +749,11 @@ func TestConnectorPipelinesGraph(t *testing.T) { ), ConnectorBuilder: connector.NewBuilder( map[component.ID]component.Config{ - component.NewID("exampleconnector"): testcomponents.ExampleConnectorFactory.CreateDefaultConfig(), - component.NewIDWithName("exampleconnector", "fork"): testcomponents.ExampleConnectorFactory.CreateDefaultConfig(), - component.NewIDWithName("exampleconnector", "merge"): testcomponents.ExampleConnectorFactory.CreateDefaultConfig(), - component.NewID("mockforward"): testcomponents.MockForwardConnectorFactory.CreateDefaultConfig(), + component.NewID("exampleconnector"): testcomponents.ExampleConnectorFactory.CreateDefaultConfig(), + component.NewIDWithName("exampleconnector", "merge"): testcomponents.ExampleConnectorFactory.CreateDefaultConfig(), + component.NewIDWithName("exampleconnector", "mutate"): testcomponents.ExampleConnectorFactory.CreateDefaultConfig(), + component.NewIDWithName("exampleconnector", "inherit_mutate"): testcomponents.ExampleConnectorFactory.CreateDefaultConfig(), + component.NewID("mockforward"): testcomponents.MockForwardConnectorFactory.CreateDefaultConfig(), }, map[component.Type]connector.Factory{ testcomponents.ExampleConnectorFactory.Type(): testcomponents.ExampleConnectorFactory, @@ -700,6 +780,11 @@ func TestConnectorPipelinesGraph(t *testing.T) { // Determine independently if the capabilities node should report MutateData as true var expectMutatesData bool + for _, expr := range pipelineCfg.Exporters { + if strings.Contains(expr.Name(), "mutate") { + expectMutatesData = true + } + } for _, proc := range pipelineCfg.Processors { if proc.Name() == "mutate" { expectMutatesData = true @@ -767,6 +852,9 @@ func TestConnectorPipelinesGraph(t *testing.T) { if !ok { continue } + if expConn.getConsumer().Capabilities().MutatesData { + continue + } // find all the pipelines of the same type where this connector is a receiver var inheritMutatesData bool for recPipelineID, recPipeline := range pg.pipelines { diff --git a/service/internal/graph/nodes.go b/service/internal/graph/nodes.go index 7f7fda70f85..eb8e886b554 100644 --- a/service/internal/graph/nodes.go +++ b/service/internal/graph/nodes.go @@ -255,6 +255,9 @@ func (n *connectorNode) buildComponent( n.Component = conn // When connecting pipelines of the same data type, the connector must // inherit the capabilities of pipelines in which it is acting as a receiver. + // Since the incoming and outgoing data types are the same, we must also consider + // that the connector itself may MutatesData. + capability.MutatesData = capability.MutatesData || conn.Capabilities().MutatesData n.baseConsumer = capabilityconsumer.NewTraces(conn, capability) case component.DataTypeMetrics: conn, err := builder.CreateMetricsToTraces(ctx, set, next) @@ -294,6 +297,9 @@ func (n *connectorNode) buildComponent( n.Component = conn // When connecting pipelines of the same data type, the connector must // inherit the capabilities of pipelines in which it is acting as a receiver. + // Since the incoming and outgoing data types are the same, we must also consider + // that the connector itself may MutatesData. + capability.MutatesData = capability.MutatesData || conn.Capabilities().MutatesData n.baseConsumer = capabilityconsumer.NewMetrics(conn, capability) case component.DataTypeLogs: conn, err := builder.CreateLogsToMetrics(ctx, set, next) @@ -332,6 +338,9 @@ func (n *connectorNode) buildComponent( n.Component = conn // When connecting pipelines of the same data type, the connector must // inherit the capabilities of pipelines in which it is acting as a receiver. + // Since the incoming and outgoing data types are the same, we must also consider + // that the connector itself may MutatesData. + capability.MutatesData = capability.MutatesData || conn.Capabilities().MutatesData n.baseConsumer = capabilityconsumer.NewLogs(conn, capability) } } diff --git a/service/internal/testcomponents/example_connector.go b/service/internal/testcomponents/example_connector.go index ccf6d678d28..c5c10b92b78 100644 --- a/service/internal/testcomponents/example_connector.go +++ b/service/internal/testcomponents/example_connector.go @@ -47,69 +47,78 @@ func createExampleConnectorDefaultConfig() component.Config { return &struct{}{} } -func createExampleTracesToTraces(_ context.Context, _ connector.CreateSettings, _ component.Config, traces consumer.Traces) (connector.Traces, error) { +func createExampleTracesToTraces(_ context.Context, set connector.CreateSettings, _ component.Config, traces consumer.Traces) (connector.Traces, error) { return &ExampleConnector{ ConsumeTracesFunc: traces.ConsumeTraces, + mutatesData: set.ID.Name() == "mutate", }, nil } -func createExampleTracesToMetrics(_ context.Context, _ connector.CreateSettings, _ component.Config, metrics consumer.Metrics) (connector.Traces, error) { +func createExampleTracesToMetrics(_ context.Context, set connector.CreateSettings, _ component.Config, metrics consumer.Metrics) (connector.Traces, error) { return &ExampleConnector{ ConsumeTracesFunc: func(ctx context.Context, td ptrace.Traces) error { return metrics.ConsumeMetrics(ctx, testdata.GenerateMetrics(td.SpanCount())) }, + mutatesData: set.ID.Name() == "mutate", }, nil } -func createExampleTracesToLogs(_ context.Context, _ connector.CreateSettings, _ component.Config, logs consumer.Logs) (connector.Traces, error) { +func createExampleTracesToLogs(_ context.Context, set connector.CreateSettings, _ component.Config, logs consumer.Logs) (connector.Traces, error) { return &ExampleConnector{ ConsumeTracesFunc: func(ctx context.Context, td ptrace.Traces) error { return logs.ConsumeLogs(ctx, testdata.GenerateLogs(td.SpanCount())) }, + mutatesData: set.ID.Name() == "mutate", }, nil } -func createExampleMetricsToTraces(_ context.Context, _ connector.CreateSettings, _ component.Config, traces consumer.Traces) (connector.Metrics, error) { +func createExampleMetricsToTraces(_ context.Context, set connector.CreateSettings, _ component.Config, traces consumer.Traces) (connector.Metrics, error) { return &ExampleConnector{ ConsumeMetricsFunc: func(ctx context.Context, md pmetric.Metrics) error { return traces.ConsumeTraces(ctx, testdata.GenerateTraces(md.MetricCount())) }, + mutatesData: set.ID.Name() == "mutate", }, nil } -func createExampleMetricsToMetrics(_ context.Context, _ connector.CreateSettings, _ component.Config, metrics consumer.Metrics) (connector.Metrics, error) { +func createExampleMetricsToMetrics(_ context.Context, set connector.CreateSettings, _ component.Config, metrics consumer.Metrics) (connector.Metrics, error) { return &ExampleConnector{ ConsumeMetricsFunc: metrics.ConsumeMetrics, + mutatesData: set.ID.Name() == "mutate", }, nil } -func createExampleMetricsToLogs(_ context.Context, _ connector.CreateSettings, _ component.Config, logs consumer.Logs) (connector.Metrics, error) { +func createExampleMetricsToLogs(_ context.Context, set connector.CreateSettings, _ component.Config, logs consumer.Logs) (connector.Metrics, error) { return &ExampleConnector{ ConsumeMetricsFunc: func(ctx context.Context, md pmetric.Metrics) error { return logs.ConsumeLogs(ctx, testdata.GenerateLogs(md.MetricCount())) }, + mutatesData: set.ID.Name() == "mutate", }, nil } -func createExampleLogsToTraces(_ context.Context, _ connector.CreateSettings, _ component.Config, traces consumer.Traces) (connector.Logs, error) { +func createExampleLogsToTraces(_ context.Context, set connector.CreateSettings, _ component.Config, traces consumer.Traces) (connector.Logs, error) { return &ExampleConnector{ ConsumeLogsFunc: func(ctx context.Context, ld plog.Logs) error { return traces.ConsumeTraces(ctx, testdata.GenerateTraces(ld.LogRecordCount())) }, + mutatesData: set.ID.Name() == "mutate", }, nil } -func createExampleLogsToMetrics(_ context.Context, _ connector.CreateSettings, _ component.Config, metrics consumer.Metrics) (connector.Logs, error) { +func createExampleLogsToMetrics(_ context.Context, set connector.CreateSettings, _ component.Config, metrics consumer.Metrics) (connector.Logs, error) { return &ExampleConnector{ ConsumeLogsFunc: func(ctx context.Context, ld plog.Logs) error { return metrics.ConsumeMetrics(ctx, testdata.GenerateMetrics(ld.LogRecordCount())) }, + mutatesData: set.ID.Name() == "mutate", }, nil } -func createExampleLogsToLogs(_ context.Context, _ connector.CreateSettings, _ component.Config, logs consumer.Logs) (connector.Logs, error) { +func createExampleLogsToLogs(_ context.Context, set connector.CreateSettings, _ component.Config, logs consumer.Logs) (connector.Logs, error) { return &ExampleConnector{ ConsumeLogsFunc: logs.ConsumeLogs, + mutatesData: set.ID.Name() == "mutate", }, nil } @@ -118,8 +127,9 @@ type ExampleConnector struct { consumer.ConsumeTracesFunc consumer.ConsumeMetricsFunc consumer.ConsumeLogsFunc + mutatesData bool } func (c *ExampleConnector) Capabilities() consumer.Capabilities { - return consumer.Capabilities{MutatesData: false} + return consumer.Capabilities{MutatesData: c.mutatesData} }