Skip to content

Commit

Permalink
Add helpers to construct connector.*Routers for tests (#7673)
Browse files Browse the repository at this point in the history
This PR adds helpers to `connectortest` to aid the construction of
`connector.*Router`s for testing connectors. This was implemented based
on @djaglowski's [comment
here](open-telemetry/opentelemetry-collector-contrib#21498 (comment)),
with some slight modifications. I found it more ergonomic to pass the
sink into the `WithTracesSink` (and similar) options. You usually want a
handle on the sink after creating the router. While it's possible to get
the sink out of the router after the fact, it's a little cumbersome.

These helpers will be useful for
open-telemetry/opentelemetry-collector-contrib#21498
and future connectors that need use of routers.

For example, here is a test with the consumer passed in:

```go
func TestFanoutTracesWithSink(t *testing.T) {
    var sink0, sink1 consumertest.TracesSink

    tr, err := NewTracesRouterSink(
        WithTracesSink(component.NewIDWithName(component.DataTypeTraces, "0"), &sink0),
        WithTracesSink(component.NewIDWithName(component.DataTypeTraces, "1"), &sink1),
    )

    require.NoError(t, err)
    require.Equal(t, 0, sink0.SpanCount())
    require.Equal(t, 0, sink1.SpanCount())

    td := testdata.GenerateTraces(1)
    err = tr.(consumer.Traces).ConsumeTraces(context.Background(), td)

    require.NoError(t, err)
    require.Equal(t, 1, sink0.SpanCount())
    require.Equal(t, 1, sink1.SpanCount())
}

```

The same test having to extract the consumer out after the fact:

```go
func TestFanoutTracesWithSink(t *testing.T) {
    traces0 := component.NewIDWithName(component.DataTypeTraces, "0")
    traces1 := component.NewIDWithName(component.DataTypeTraces, "1")

    tr, err := NewTracesRouterSink(
        WithTracesSink(traces0),
        WithTracesSink(traces1),
    )
   
    require.NoError(t, err)
   
    cons0, _ := tr.Consumer(traces0)
    sink0 := cons0.(*consumertest.TracesSink)
    cons1, _ := tr.Consumer(traces1)
    sink1 := cons1.(*consumertest.TracesSink)

    require.Equal(t, 0, sink0.SpanCount())
    require.Equal(t, 0, sink1.SpanCount())

    td := testdata.GenerateTraces(1)
    err = tr.(consumer.Traces).ConsumeTraces(context.Background(), td)

    require.NoError(t, err)
    require.Equal(t, 1, sink0.SpanCount())
    require.Equal(t, 1, sink1.SpanCount())}
}
```


**Link to tracking Issue:** 
#7672

**Testing:**
Unit tests

**Documentation:**
Source code comments

---------

Co-authored-by: Daniel Jaglowski <[email protected]>
  • Loading branch information
mwear and djaglowski authored Jun 22, 2023
1 parent 479a328 commit fd72651
Show file tree
Hide file tree
Showing 12 changed files with 215 additions and 6 deletions.
16 changes: 16 additions & 0 deletions .chloggen/connector_router_test_helpers.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: connectortest

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add helpers to aid the construction of `connector.TracesRouter`, `connector.MetricsRouter`, and `connector.LogsRouter` instances to `connectortest`.

# One or more tracking issues or pull requests related to the change
issues: [7672]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
84 changes: 84 additions & 0 deletions connector/connectortest/router.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package connectortest // import "go.opentelemetry.io/collector/connector/connectortest"

import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/internal/fanoutconsumer"
)

type TracesRouterOption struct {
id component.ID
cons consumer.Traces
}

// WithNopTraces creates a nop consumer for a connector.TracesRouter
func WithNopTraces(id component.ID) TracesRouterOption {
return TracesRouterOption{id: id, cons: consumertest.NewNop()}
}

// WithTracesSink adds a consumer to a connector.TracesRouter
func WithTracesSink(id component.ID, sink *consumertest.TracesSink) TracesRouterOption {
return TracesRouterOption{id: id, cons: sink}
}

// NewTracesRouter returns a connector.TracesRouter with sinks based on the options provided
func NewTracesRouter(opts ...TracesRouterOption) connector.TracesRouter {
consumers := make(map[component.ID]consumer.Traces)
for _, opt := range opts {
consumers[opt.id] = opt.cons
}
return fanoutconsumer.NewTracesRouter(consumers).(connector.TracesRouter)
}

type MetricsRouterOption struct {
id component.ID
cons consumer.Metrics
}

// WithNopMetrics creates a nop consumer for a connector.MetricsRouter
func WithNopMetrics(id component.ID) MetricsRouterOption {
return MetricsRouterOption{id: id, cons: consumertest.NewNop()}
}

// WithMetricsSink adds a consumer to a connector.MetricsRouter
func WithMetricsSink(id component.ID, sink *consumertest.MetricsSink) MetricsRouterOption {
return MetricsRouterOption{id: id, cons: sink}
}

// NewMetricsRouter returns a connector.MetricsRouter with sinks based on the options provided
func NewMetricsRouter(opts ...MetricsRouterOption) connector.MetricsRouter {
consumers := make(map[component.ID]consumer.Metrics)
for _, opt := range opts {
consumers[opt.id] = opt.cons
}
return fanoutconsumer.NewMetricsRouter(consumers).(connector.MetricsRouter)
}

type LogsRouterOption struct {
id component.ID
cons consumer.Logs
}

// WithNopLogs creates a nop consumer for a connector.LogsRouter
func WithNopLogs(id component.ID) LogsRouterOption {
return LogsRouterOption{id: id, cons: consumertest.NewNop()}
}

// WithLogsSink adds a consumer to a connector.LogsRouter
func WithLogsSink(id component.ID, sink *consumertest.LogsSink) LogsRouterOption {
return LogsRouterOption{id: id, cons: sink}
}

// NewLogsRouter returns a connector.LogsRouter with sinks based on the options provided
func NewLogsRouter(opts ...LogsRouterOption) connector.LogsRouter {
consumers := make(map[component.ID]consumer.Logs)
for _, opt := range opts {
consumers[opt.id] = opt.cons
}
return fanoutconsumer.NewLogsRouter(consumers).(connector.LogsRouter)
}
109 changes: 109 additions & 0 deletions connector/connectortest/router_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package connectortest

import (
"context"
"testing"

"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/internal/testdata"
)

func TestTracesRouterWithNop(t *testing.T) {
tr := NewTracesRouter(
WithNopTraces(component.NewIDWithName(component.DataTypeTraces, "0")),
WithNopTraces(component.NewIDWithName(component.DataTypeTraces, "1")),
)

td := testdata.GenerateTraces(1)
err := tr.(consumer.Traces).ConsumeTraces(context.Background(), td)

require.NoError(t, err)
}

func TestTracesRouterWithSink(t *testing.T) {
var sink0, sink1 consumertest.TracesSink

tr := NewTracesRouter(
WithTracesSink(component.NewIDWithName(component.DataTypeTraces, "0"), &sink0),
WithTracesSink(component.NewIDWithName(component.DataTypeTraces, "1"), &sink1),
)

require.Equal(t, 0, sink0.SpanCount())
require.Equal(t, 0, sink1.SpanCount())

td := testdata.GenerateTraces(1)
err := tr.(consumer.Traces).ConsumeTraces(context.Background(), td)

require.NoError(t, err)
require.Equal(t, 1, sink0.SpanCount())
require.Equal(t, 1, sink1.SpanCount())
}

func TestMetricsRouterWithNop(t *testing.T) {
mr := NewMetricsRouter(
WithNopMetrics(component.NewIDWithName(component.DataTypeMetrics, "0")),
WithNopMetrics(component.NewIDWithName(component.DataTypeMetrics, "1")),
)

md := testdata.GenerateMetrics(1)
err := mr.(consumer.Metrics).ConsumeMetrics(context.Background(), md)

require.NoError(t, err)
}

func TestMetricsRouterWithSink(t *testing.T) {
var sink0, sink1 consumertest.MetricsSink

mr := NewMetricsRouter(
WithMetricsSink(component.NewIDWithName(component.DataTypeMetrics, "0"), &sink0),
WithMetricsSink(component.NewIDWithName(component.DataTypeMetrics, "1"), &sink1),
)

require.Len(t, sink0.AllMetrics(), 0)
require.Len(t, sink1.AllMetrics(), 0)

md := testdata.GenerateMetrics(1)
err := mr.(consumer.Metrics).ConsumeMetrics(context.Background(), md)

require.NoError(t, err)
require.Len(t, sink0.AllMetrics(), 1)
require.Len(t, sink1.AllMetrics(), 1)
}

func TestLogsRouterWithNop(t *testing.T) {
lr := NewLogsRouter(
WithNopLogs(component.NewIDWithName(component.DataTypeLogs, "0")),
WithNopLogs(component.NewIDWithName(component.DataTypeLogs, "1")),
)

ld := testdata.GenerateLogs(1)
err := lr.(consumer.Logs).ConsumeLogs(context.Background(), ld)

require.NoError(t, err)
}

func TestLogsRouterWithSink(t *testing.T) {
var sink0, sink1 consumertest.LogsSink

lr := NewLogsRouter(
WithLogsSink(component.NewIDWithName(component.DataTypeLogs, "0"), &sink0),
WithLogsSink(component.NewIDWithName(component.DataTypeLogs, "1"), &sink1),
)

require.Equal(t, 0, sink0.LogRecordCount())
require.Equal(t, 0, sink1.LogRecordCount())

ld := testdata.GenerateLogs(1)
err := lr.(consumer.Logs).ConsumeLogs(context.Background(), ld)

require.NoError(t, err)
require.Equal(t, 1, sink0.LogRecordCount())
require.Equal(t, 1, sink1.LogRecordCount())
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

// Package fanoutconsumer contains implementations of Traces/Metrics/Logs consumers
// that fan out the data to multiple other consumers.
package fanoutconsumer // import "go.opentelemetry.io/collector/service/internal/fanoutconsumer"
package fanoutconsumer // import "go.opentelemetry.io/collector/internal/fanoutconsumer"

import (
"context"
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package fanoutconsumer // import "go.opentelemetry.io/collector/service/internal/fanoutconsumer"
package fanoutconsumer // import "go.opentelemetry.io/collector/internal/fanoutconsumer"

import (
"context"
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package fanoutconsumer // import "go.opentelemetry.io/collector/service/internal/fanoutconsumer"
package fanoutconsumer // import "go.opentelemetry.io/collector/internal/fanoutconsumer"

import (
"context"
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion service/internal/graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ import (
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/internal/fanoutconsumer"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/service/internal/capabilityconsumer"
"go.opentelemetry.io/collector/service/internal/fanoutconsumer"
"go.opentelemetry.io/collector/service/pipelines"
)

Expand Down
2 changes: 1 addition & 1 deletion service/internal/graph/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ import (
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/internal/fanoutconsumer"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/service/internal/capabilityconsumer"
"go.opentelemetry.io/collector/service/internal/components"
"go.opentelemetry.io/collector/service/internal/fanoutconsumer"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion service/internal/testcomponents/example_router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
"go.opentelemetry.io/collector/connector/connectortest"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/internal/fanoutconsumer"
"go.opentelemetry.io/collector/internal/testdata"
"go.opentelemetry.io/collector/service/internal/fanoutconsumer"
)

func TestExampleRouter(t *testing.T) {
Expand Down

0 comments on commit fd72651

Please sign in to comment.