Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

routingprocessor: avoid duplicate code with consumertest #9804

Merged
merged 1 commit into from
May 9, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 56 additions & 111 deletions processor/routingprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package routingprocessor
import (
"context"
"errors"
"sync/atomic"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -27,7 +26,7 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/exporter/otlpexporter"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
Expand Down Expand Up @@ -221,26 +220,26 @@ func TestTraces_AreCorrectlySplitPerResourceAttributeRouting(t *testing.T) {

// The numbers below stem from the fact that data is routed and grouped
// per resource attribute which is used for routing.
// Hence the first 2 traces are grouped together under one plog.Logs.
assert.Equal(t, 1, defaultExp.getTraceCount(),
"one log should be routed to default exporter",
// Hence the first 2 traces are grouped together under one ptrace.Traces.
assert.Len(t, defaultExp.AllTraces(), 1,
"one trace should be routed to default exporter",
)
assert.Equal(t, 1, tExp.getTraceCount(),
"one log should be routed to non default exporter",
assert.Len(t, tExp.AllTraces(), 1,
"one trace should be routed to non default exporter",
)
}

func TestTraces_RoutingWorks_Context(t *testing.T) {
defaultExp := &mockTracesExporter{}
lExp := &mockTracesExporter{}
tExp := &mockTracesExporter{}

host := &mockHost{
Host: componenttest.NewNopHost(),
GetExportersFunc: func() map[config.DataType]map[config.ComponentID]component.Exporter {
return map[config.DataType]map[config.ComponentID]component.Exporter{
config.TracesDataType: {
config.NewComponentID("otlp"): defaultExp,
config.NewComponentID("otlp/2"): lExp,
config.NewComponentID("otlp/2"): tExp,
},
}
},
Expand Down Expand Up @@ -270,10 +269,10 @@ func TestTraces_RoutingWorks_Context(t *testing.T) {
})),
tr,
))
assert.Equal(t, 0, defaultExp.getTraceCount(),
assert.Len(t, defaultExp.AllTraces(), 0,
"trace should not be routed to default exporter",
)
assert.Equal(t, 1, lExp.getTraceCount(),
assert.Len(t, tExp.AllTraces(), 1,
"trace should be routed to non default exporter",
)
})
Expand All @@ -285,26 +284,26 @@ func TestTraces_RoutingWorks_Context(t *testing.T) {
})),
tr,
))
assert.Equal(t, 1, defaultExp.getTraceCount(),
assert.Len(t, defaultExp.AllTraces(), 1,
"trace should be routed to default exporter",
)
assert.Equal(t, 1, lExp.getTraceCount(),
assert.Len(t, tExp.AllTraces(), 1,
"trace should not be routed to non default exporter",
)
})
}

func TestTraces_RoutingWorks_ResourceAttribute(t *testing.T) {
defaultExp := &mockTracesExporter{}
mExp := &mockTracesExporter{}
tExp := &mockTracesExporter{}

host := &mockHost{
Host: componenttest.NewNopHost(),
GetExportersFunc: func() map[config.DataType]map[config.ComponentID]component.Exporter {
return map[config.DataType]map[config.ComponentID]component.Exporter{
config.TracesDataType: {
config.NewComponentID("otlp"): defaultExp,
config.NewComponentID("otlp/2"): mExp,
config.NewComponentID("otlp/2"): tExp,
},
}
},
Expand All @@ -329,10 +328,10 @@ func TestTraces_RoutingWorks_ResourceAttribute(t *testing.T) {
rs.Resource().Attributes().InsertString("X-Tenant", "acme")

assert.NoError(t, exp.ConsumeTraces(context.Background(), tr))
assert.Equal(t, 0, defaultExp.getTraceCount(),
assert.Len(t, defaultExp.AllTraces(), 0,
"trace should not be routed to default exporter",
)
assert.Equal(t, 1, mExp.getTraceCount(),
assert.Len(t, tExp.AllTraces(), 1,
"trace should be routed to non default exporter",
)
})
Expand All @@ -343,10 +342,10 @@ func TestTraces_RoutingWorks_ResourceAttribute(t *testing.T) {
rs.Resource().Attributes().InsertString("X-Tenant", "some-custom-value")

assert.NoError(t, exp.ConsumeTraces(context.Background(), tr))
assert.Equal(t, 1, defaultExp.getTraceCount(),
assert.Len(t, defaultExp.AllTraces(), 1,
"trace should be routed to default exporter",
)
assert.Equal(t, 1, mExp.getTraceCount(),
assert.Len(t, tExp.AllTraces(), 1,
"trace should not be routed to non default exporter",
)
})
Expand Down Expand Up @@ -388,12 +387,12 @@ func TestTraces_RoutingWorks_ResourceAttribute_DropsRoutingAttribute(t *testing.
rm.Resource().Attributes().InsertString("attr", "acme")

assert.NoError(t, exp.ConsumeTraces(context.Background(), tr))
assert.Equal(t, 1, tExp.getTraceCount(),
traces := tExp.AllTraces()
require.Len(t, traces, 1,
"trace should be routed to non default exporter",
)
require.Len(t, tExp.traces, 1)
require.Equal(t, 1, tExp.traces[0].ResourceSpans().Len())
attrs := tExp.traces[0].ResourceSpans().At(0).Resource().Attributes()
require.Equal(t, 1, traces[0].ResourceSpans().Len())
attrs := traces[0].ResourceSpans().At(0).Resource().Attributes()
_, ok := attrs.Get("X-Tenant")
assert.False(t, ok, "routing attribute should have been dropped")
v, ok := attrs.Get("attr")
Expand Down Expand Up @@ -474,10 +473,10 @@ func TestMetrics_AreCorrectlySplitPerResourceAttributeRouting(t *testing.T) {
// The numbers below stem from the fact that data is routed and grouped
// per resource attribute which is used for routing.
// Hence the first 2 metrics are grouped together under one pmetric.Metrics.
assert.Equal(t, 1, defaultExp.getMetricCount(),
assert.Len(t, defaultExp.AllMetrics(), 1,
"one metric should be routed to default exporter",
)
assert.Equal(t, 1, mExp.getMetricCount(),
assert.Len(t, mExp.AllMetrics(), 1,
"one metric should be routed to non default exporter",
)
}
Expand Down Expand Up @@ -522,10 +521,10 @@ func TestMetrics_RoutingWorks_Context(t *testing.T) {
})),
m,
))
assert.Equal(t, 0, defaultExp.getMetricCount(),
assert.Len(t, defaultExp.AllMetrics(), 0,
"metric should not be routed to default exporter",
)
assert.Equal(t, 1, mExp.getMetricCount(),
assert.Len(t, mExp.AllMetrics(), 1,
"metric should be routed to non default exporter",
)
})
Expand All @@ -537,10 +536,10 @@ func TestMetrics_RoutingWorks_Context(t *testing.T) {
})),
m,
))
assert.Equal(t, 1, defaultExp.getMetricCount(),
assert.Len(t, defaultExp.AllMetrics(), 1,
"metric should be routed to default exporter",
)
assert.Equal(t, 1, mExp.getMetricCount(),
assert.Len(t, mExp.AllMetrics(), 1,
"metric should not be routed to non default exporter",
)
})
Expand Down Expand Up @@ -581,10 +580,10 @@ func TestMetrics_RoutingWorks_ResourceAttribute(t *testing.T) {
rm.Resource().Attributes().InsertString("X-Tenant", "acme")

assert.NoError(t, exp.ConsumeMetrics(context.Background(), m))
assert.Equal(t, 0, defaultExp.getMetricCount(),
assert.Len(t, defaultExp.AllMetrics(), 0,
"metric should not be routed to default exporter",
)
assert.Equal(t, 1, mExp.getMetricCount(),
assert.Len(t, mExp.AllMetrics(), 1,
"metric should be routed to non default exporter",
)
})
Expand All @@ -595,10 +594,10 @@ func TestMetrics_RoutingWorks_ResourceAttribute(t *testing.T) {
rm.Resource().Attributes().InsertString("X-Tenant", "some-custom-value")

assert.NoError(t, exp.ConsumeMetrics(context.Background(), m))
assert.Equal(t, 1, defaultExp.getMetricCount(),
assert.Len(t, defaultExp.AllMetrics(), 1,
"metric should be routed to default exporter",
)
assert.Equal(t, 1, mExp.getMetricCount(),
assert.Len(t, mExp.AllMetrics(), 1,
"metric should not be routed to non default exporter",
)
})
Expand Down Expand Up @@ -640,12 +639,10 @@ func TestMetrics_RoutingWorks_ResourceAttribute_DropsRoutingAttribute(t *testing
rm.Resource().Attributes().InsertString("attr", "acme")

assert.NoError(t, exp.ConsumeMetrics(context.Background(), m))
assert.Equal(t, 1, mExp.getMetricCount(),
"metric should be routed to non default exporter",
)
require.Len(t, mExp.metrics, 1)
require.Equal(t, 1, mExp.metrics[0].ResourceMetrics().Len())
attrs := mExp.metrics[0].ResourceMetrics().At(0).Resource().Attributes()
metrics := mExp.AllMetrics()
require.Len(t, metrics, 1, "metric should be routed to non default exporter")
require.Equal(t, 1, metrics[0].ResourceMetrics().Len())
attrs := metrics[0].ResourceMetrics().At(0).Resource().Attributes()
_, ok := attrs.Get("X-Tenant")
assert.False(t, ok, "routing attribute should have been dropped")
v, ok := attrs.Get("attr")
Expand Down Expand Up @@ -693,10 +690,10 @@ func TestLogs_RoutingWorks_Context(t *testing.T) {
})),
l,
))
assert.Equal(t, 0, defaultExp.getLogCount(),
assert.Len(t, defaultExp.AllLogs(), 0,
"log should not be routed to default exporter",
)
assert.Equal(t, 1, lExp.getLogCount(),
assert.Len(t, lExp.AllLogs(), 1,
"log should be routed to non default exporter",
)
})
Expand All @@ -708,10 +705,10 @@ func TestLogs_RoutingWorks_Context(t *testing.T) {
})),
l,
))
assert.Equal(t, 1, defaultExp.getLogCount(),
assert.Len(t, defaultExp.AllLogs(), 1,
"log should be routed to default exporter",
)
assert.Equal(t, 1, lExp.getLogCount(),
assert.Len(t, lExp.AllLogs(), 1,
"log should not be routed to non default exporter",
)
})
Expand Down Expand Up @@ -752,10 +749,10 @@ func TestLogs_RoutingWorks_ResourceAttribute(t *testing.T) {
rl.Resource().Attributes().InsertString("X-Tenant", "acme")

assert.NoError(t, exp.ConsumeLogs(context.Background(), l))
assert.Equal(t, 0, defaultExp.getLogCount(),
assert.Len(t, defaultExp.AllLogs(), 0,
"log should not be routed to default exporter",
)
assert.Equal(t, 1, lExp.getLogCount(),
assert.Len(t, lExp.AllLogs(), 1,
"log should be routed to non default exporter",
)
})
Expand All @@ -766,10 +763,10 @@ func TestLogs_RoutingWorks_ResourceAttribute(t *testing.T) {
rl.Resource().Attributes().InsertString("X-Tenant", "some-custom-value")

assert.NoError(t, exp.ConsumeLogs(context.Background(), l))
assert.Equal(t, 1, defaultExp.getLogCount(),
assert.Len(t, defaultExp.AllLogs(), 1,
"log should be routed to default exporter",
)
assert.Equal(t, 1, lExp.getLogCount(),
assert.Len(t, lExp.AllLogs(), 1,
"log should not be routed to non default exporter",
)
})
Expand Down Expand Up @@ -811,12 +808,10 @@ func TestLogs_RoutingWorks_ResourceAttribute_DropsRoutingAttribute(t *testing.T)
rm.Resource().Attributes().InsertString("attr", "acme")

assert.NoError(t, exp.ConsumeLogs(context.Background(), l))
assert.Equal(t, 1, lExp.getLogCount(),
"log should be routed to non-default exporter",
)
require.Len(t, lExp.logs, 1)
require.Equal(t, 1, lExp.logs[0].ResourceLogs().Len())
attrs := lExp.logs[0].ResourceLogs().At(0).Resource().Attributes()
logs := lExp.AllLogs()
require.Len(t, logs, 1, "log should be routed to non-default exporter")
require.Equal(t, 1, logs[0].ResourceLogs().Len())
attrs := logs[0].ResourceLogs().At(0).Resource().Attributes()
_, ok := attrs.Get("X-Tenant")
assert.False(t, ok, "routing attribute should have been dropped")
v, ok := attrs.Get("attr")
Expand Down Expand Up @@ -873,10 +868,10 @@ func TestLogs_AreCorrectlySplitPerResourceAttributeRouting(t *testing.T) {
// The numbers below stem from the fact that data is routed and grouped
// per resource attribute which is used for routing.
// Hence the first 2 metrics are grouped together under one plog.Logs.
assert.Equal(t, 1, defaultExp.getLogCount(),
assert.Len(t, defaultExp.AllLogs(), 1,
"one log should be routed to default exporter",
)
assert.Equal(t, 1, lExp.getLogCount(),
assert.Len(t, lExp.AllLogs(), 1,
"one log should be routed to non default exporter",
)
}
Expand Down Expand Up @@ -941,72 +936,22 @@ func (m *mockHost) GetExporters() map[config.DataType]map[config.ComponentID]com
return m.Host.GetExporters()
}

type mockComponent struct{}

func (m *mockComponent) Start(context.Context, component.Host) error {
return nil
}

func (m *mockComponent) Shutdown(context.Context) error {
return nil
type mockComponent struct {
component.StartFunc
component.ShutdownFunc
}

type mockMetricsExporter struct {
mockComponent
metricCount int32
metrics []pmetric.Metrics
}

func (m *mockMetricsExporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

func (m *mockMetricsExporter) ConsumeMetrics(_ context.Context, metrics pmetric.Metrics) error {
atomic.AddInt32(&m.metricCount, 1)
m.metrics = append(m.metrics, metrics)
return nil
}

func (m *mockMetricsExporter) getMetricCount() int {
return int(atomic.LoadInt32(&m.metricCount))
consumertest.MetricsSink
}

type mockLogsExporter struct {
mockComponent
logCount int32
logs []plog.Logs
}

func (m *mockLogsExporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

func (m *mockLogsExporter) ConsumeLogs(_ context.Context, logs plog.Logs) error {
atomic.AddInt32(&m.logCount, 1)
m.logs = append(m.logs, logs)
return nil
}

func (m *mockLogsExporter) getLogCount() int {
return int(atomic.LoadInt32(&m.logCount))
consumertest.LogsSink
}

type mockTracesExporter struct {
mockComponent
traceCount int32
traces []ptrace.Traces
}

func (m *mockTracesExporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

func (m *mockTracesExporter) ConsumeTraces(_ context.Context, traces ptrace.Traces) error {
atomic.AddInt32(&m.traceCount, 1)
m.traces = append(m.traces, traces)
return nil
}

func (m *mockTracesExporter) getTraceCount() int {
return int(atomic.LoadInt32(&m.traceCount))
consumertest.TracesSink
}