diff --git a/.chloggen/connectorprofiles-router.yaml b/.chloggen/connectorprofiles-router.yaml new file mode 100644 index 00000000000..0e3df7db51c --- /dev/null +++ b/.chloggen/connectorprofiles-router.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: connectorprofiles + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add ProfilesRouterAndConsumer interface, and NewProfilesRouter method. + +# One or more tracking issues or pull requests related to the change +issues: [11023] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/connector/connectorprofiles/go.mod b/connector/connectorprofiles/go.mod index ae38595f285..d25dde93266 100644 --- a/connector/connectorprofiles/go.mod +++ b/connector/connectorprofiles/go.mod @@ -4,12 +4,15 @@ go 1.22.0 require ( github.com/stretchr/testify v1.9.0 + go.opentelemetry.io/collector v0.108.1 go.opentelemetry.io/collector/component v0.108.1 go.opentelemetry.io/collector/component/componentprofiles v0.108.1 go.opentelemetry.io/collector/connector v0.108.1 go.opentelemetry.io/collector/consumer v0.108.1 go.opentelemetry.io/collector/consumer/consumerprofiles v0.108.1 go.opentelemetry.io/collector/consumer/consumertest v0.108.1 + go.opentelemetry.io/collector/pdata/pprofile v0.108.1 + go.opentelemetry.io/collector/pdata/testdata v0.108.1 ) require ( @@ -19,10 +22,8 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - go.opentelemetry.io/collector v0.108.1 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.108.1 // indirect go.opentelemetry.io/collector/pdata v1.14.1 // indirect - go.opentelemetry.io/collector/pdata/pprofile v0.108.1 // indirect go.opentelemetry.io/otel v1.29.0 // indirect go.opentelemetry.io/otel/metric v1.29.0 // indirect go.opentelemetry.io/otel/trace v1.29.0 // indirect diff --git a/connector/connectorprofiles/profiles_router.go b/connector/connectorprofiles/profiles_router.go new file mode 100644 index 00000000000..73e427f1d52 --- /dev/null +++ b/connector/connectorprofiles/profiles_router.go @@ -0,0 +1,37 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package connectorprofiles // import "go.opentelemetry.io/collector/connector/connectorprofiles" + +import ( + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/connector/internal" + "go.opentelemetry.io/collector/consumer/consumerprofiles" + "go.opentelemetry.io/collector/internal/fanoutconsumer" +) + +// ProfilesRouterAndConsumer feeds the first consumerprofiles.Profiles in each of the specified pipelines. +type ProfilesRouterAndConsumer interface { + consumerprofiles.Profiles + Consumer(...component.ID) (consumerprofiles.Profiles, error) + PipelineIDs() []component.ID + privateFunc() +} + +type profilesRouter struct { + consumerprofiles.Profiles + internal.BaseRouter[consumerprofiles.Profiles] +} + +func NewProfilesRouter(cm map[component.ID]consumerprofiles.Profiles) ProfilesRouterAndConsumer { + consumers := make([]consumerprofiles.Profiles, 0, len(cm)) + for _, cons := range cm { + consumers = append(consumers, cons) + } + return &profilesRouter{ + Profiles: fanoutconsumer.NewProfiles(consumers), + BaseRouter: internal.NewBaseRouter(fanoutconsumer.NewProfiles, cm), + } +} + +func (r *profilesRouter) privateFunc() {} diff --git a/connector/connectorprofiles/profiles_router_test.go b/connector/connectorprofiles/profiles_router_test.go new file mode 100644 index 00000000000..fcec379dfba --- /dev/null +++ b/connector/connectorprofiles/profiles_router_test.go @@ -0,0 +1,157 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package connectorprofiles + +import ( + "context" + "fmt" + "strconv" + "testing" + + "github.com/stretchr/testify/assert" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumerprofiles" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/pprofile" + "go.opentelemetry.io/collector/pdata/testdata" +) + +type mutatingProfilesSink struct { + *consumertest.ProfilesSink +} + +func (mts *mutatingProfilesSink) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: true} +} + +func TestProfilesRouterMultiplexing(t *testing.T) { + var max = 20 + for numIDs := 1; numIDs < max; numIDs++ { + for numCons := 1; numCons < max; numCons++ { + for numProfiles := 1; numProfiles < max; numProfiles++ { + t.Run( + fmt.Sprintf("%d-ids/%d-cons/%d-logs", numIDs, numCons, numProfiles), + fuzzProfiles(numIDs, numCons, numProfiles), + ) + } + } + } +} + +func fuzzProfiles(numIDs, numCons, numProfiles int) func(*testing.T) { + return func(t *testing.T) { + allIDs := make([]component.ID, 0, numCons) + allCons := make([]consumerprofiles.Profiles, 0, numCons) + allConsMap := make(map[component.ID]consumerprofiles.Profiles) + + // If any consumer is mutating, the router must report mutating + for i := 0; i < numCons; i++ { + allIDs = append(allIDs, component.MustNewIDWithName("sink", strconv.Itoa(numCons))) + // Random chance for each consumer to be mutating + if (numCons+numProfiles+i)%4 == 0 { + allCons = append(allCons, &mutatingProfilesSink{ProfilesSink: new(consumertest.ProfilesSink)}) + } else { + allCons = append(allCons, new(consumertest.ProfilesSink)) + } + allConsMap[allIDs[i]] = allCons[i] + } + + r := NewProfilesRouter(allConsMap) + td := testdata.GenerateProfiles(1) + + // Keep track of how many logs each consumer should receive. + // This will be validated after every call to RouteProfiles. + expected := make(map[component.ID]int, numCons) + + for i := 0; i < numProfiles; i++ { + // Build a random set of ids (no duplicates) + randCons := make(map[component.ID]bool, numIDs) + for j := 0; j < numIDs; j++ { + // This number should be pretty random and less than numCons + conNum := (numCons + numIDs + i + j) % numCons + randCons[allIDs[conNum]] = true + } + + // Convert to slice, update expectations + conIDs := make([]component.ID, 0, len(randCons)) + for id := range randCons { + conIDs = append(conIDs, id) + expected[id]++ + } + + // Route to list of consumers + fanout, err := r.Consumer(conIDs...) + assert.NoError(t, err) + assert.NoError(t, fanout.ConsumeProfiles(context.Background(), td)) + + // Validate expectations for all consumers + for id := range expected { + profiles := []pprofile.Profiles{} + switch con := allConsMap[id].(type) { + case *consumertest.ProfilesSink: + profiles = con.AllProfiles() + case *mutatingProfilesSink: + profiles = con.AllProfiles() + } + assert.Len(t, profiles, expected[id]) + for n := 0; n < len(profiles); n++ { + assert.EqualValues(t, td, profiles[n]) + } + } + } + } +} + +func TestProfilessRouterConsumer(t *testing.T) { + ctx := context.Background() + td := testdata.GenerateProfiles(1) + + fooID := component.MustNewID("foo") + barID := component.MustNewID("bar") + + foo := new(consumertest.ProfilesSink) + bar := new(consumertest.ProfilesSink) + r := NewProfilesRouter(map[component.ID]consumerprofiles.Profiles{fooID: foo, barID: bar}) + + rcs := r.PipelineIDs() + assert.Len(t, rcs, 2) + assert.ElementsMatch(t, []component.ID{fooID, barID}, rcs) + + assert.Len(t, foo.AllProfiles(), 0) + assert.Len(t, bar.AllProfiles(), 0) + + both, err := r.Consumer(fooID, barID) + assert.NotNil(t, both) + assert.NoError(t, err) + + assert.NoError(t, both.ConsumeProfiles(ctx, td)) + assert.Len(t, foo.AllProfiles(), 1) + assert.Len(t, bar.AllProfiles(), 1) + + fooOnly, err := r.Consumer(fooID) + assert.NotNil(t, fooOnly) + assert.NoError(t, err) + + assert.NoError(t, fooOnly.ConsumeProfiles(ctx, td)) + assert.Len(t, foo.AllProfiles(), 2) + assert.Len(t, bar.AllProfiles(), 1) + + barOnly, err := r.Consumer(barID) + assert.NotNil(t, barOnly) + assert.NoError(t, err) + + assert.NoError(t, barOnly.ConsumeProfiles(ctx, td)) + assert.Len(t, foo.AllProfiles(), 2) + assert.Len(t, bar.AllProfiles(), 2) + + none, err := r.Consumer() + assert.Nil(t, none) + assert.Error(t, err) + + fake, err := r.Consumer(component.MustNewID("fake")) + assert.Nil(t, fake) + assert.Error(t, err) +} diff --git a/connector/router.go b/connector/internal/router.go similarity index 61% rename from connector/router.go rename to connector/internal/router.go index bba7ee76bba..18c684807cb 100644 --- a/connector/router.go +++ b/connector/internal/router.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package connector // import "go.opentelemetry.io/collector/connector" +package internal // import "go.opentelemetry.io/collector/connector/internal" import ( "fmt" @@ -11,28 +11,28 @@ import ( "go.opentelemetry.io/collector/component" ) -type baseRouter[T any] struct { +type BaseRouter[T any] struct { fanout func([]T) T - consumers map[component.ID]T + Consumers map[component.ID]T } -func newBaseRouter[T any](fanout func([]T) T, cm map[component.ID]T) baseRouter[T] { +func NewBaseRouter[T any](fanout func([]T) T, cm map[component.ID]T) BaseRouter[T] { consumers := make(map[component.ID]T, len(cm)) for k, v := range cm { consumers[k] = v } - return baseRouter[T]{fanout: fanout, consumers: consumers} + return BaseRouter[T]{fanout: fanout, Consumers: consumers} } -func (r *baseRouter[T]) PipelineIDs() []component.ID { - ids := make([]component.ID, 0, len(r.consumers)) - for id := range r.consumers { +func (r *BaseRouter[T]) PipelineIDs() []component.ID { + ids := make([]component.ID, 0, len(r.Consumers)) + for id := range r.Consumers { ids = append(ids, id) } return ids } -func (r *baseRouter[T]) Consumer(pipelineIDs ...component.ID) (T, error) { +func (r *BaseRouter[T]) Consumer(pipelineIDs ...component.ID) (T, error) { var ret T if len(pipelineIDs) == 0 { return ret, fmt.Errorf("missing consumers") @@ -40,7 +40,7 @@ func (r *baseRouter[T]) Consumer(pipelineIDs ...component.ID) (T, error) { consumers := make([]T, 0, len(pipelineIDs)) var errors error for _, pipelineID := range pipelineIDs { - c, ok := r.consumers[pipelineID] + c, ok := r.Consumers[pipelineID] if ok { consumers = append(consumers, c) } else { diff --git a/connector/logs_router.go b/connector/logs_router.go index 0db9ea7799d..37a3831038e 100644 --- a/connector/logs_router.go +++ b/connector/logs_router.go @@ -9,6 +9,7 @@ import ( "go.uber.org/multierr" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/connector/internal" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/internal/fanoutconsumer" ) @@ -23,7 +24,7 @@ type LogsRouterAndConsumer interface { type logsRouter struct { consumer.Logs - baseRouter[consumer.Logs] + internal.BaseRouter[consumer.Logs] } func NewLogsRouter(cm map[component.ID]consumer.Logs) LogsRouterAndConsumer { @@ -33,13 +34,13 @@ func NewLogsRouter(cm map[component.ID]consumer.Logs) LogsRouterAndConsumer { } return &logsRouter{ Logs: fanoutconsumer.NewLogs(consumers), - baseRouter: newBaseRouter(fanoutconsumer.NewLogs, cm), + BaseRouter: internal.NewBaseRouter(fanoutconsumer.NewLogs, cm), } } func (r *logsRouter) PipelineIDs() []component.ID { - ids := make([]component.ID, 0, len(r.consumers)) - for id := range r.consumers { + ids := make([]component.ID, 0, len(r.Consumers)) + for id := range r.Consumers { ids = append(ids, id) } return ids @@ -52,7 +53,7 @@ func (r *logsRouter) Consumer(pipelineIDs ...component.ID) (consumer.Logs, error consumers := make([]consumer.Logs, 0, len(pipelineIDs)) var errors error for _, pipelineID := range pipelineIDs { - c, ok := r.consumers[pipelineID] + c, ok := r.Consumers[pipelineID] if ok { consumers = append(consumers, c) } else { diff --git a/connector/metrics_router.go b/connector/metrics_router.go index 3e688261bfe..67ba27f86a0 100644 --- a/connector/metrics_router.go +++ b/connector/metrics_router.go @@ -5,6 +5,7 @@ package connector // import "go.opentelemetry.io/collector/connector" import ( "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/connector/internal" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/internal/fanoutconsumer" ) @@ -19,7 +20,7 @@ type MetricsRouterAndConsumer interface { type metricsRouter struct { consumer.Metrics - baseRouter[consumer.Metrics] + internal.BaseRouter[consumer.Metrics] } func NewMetricsRouter(cm map[component.ID]consumer.Metrics) MetricsRouterAndConsumer { @@ -29,7 +30,7 @@ func NewMetricsRouter(cm map[component.ID]consumer.Metrics) MetricsRouterAndCons } return &metricsRouter{ Metrics: fanoutconsumer.NewMetrics(consumers), - baseRouter: newBaseRouter(fanoutconsumer.NewMetrics, cm), + BaseRouter: internal.NewBaseRouter(fanoutconsumer.NewMetrics, cm), } } diff --git a/connector/traces_router.go b/connector/traces_router.go index 84eb889c05a..40cfbdd18a0 100644 --- a/connector/traces_router.go +++ b/connector/traces_router.go @@ -5,6 +5,7 @@ package connector // import "go.opentelemetry.io/collector/connector" import ( "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/connector/internal" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/internal/fanoutconsumer" ) @@ -19,7 +20,7 @@ type TracesRouterAndConsumer interface { type tracesRouter struct { consumer.Traces - baseRouter[consumer.Traces] + internal.BaseRouter[consumer.Traces] } func NewTracesRouter(cm map[component.ID]consumer.Traces) TracesRouterAndConsumer { @@ -29,7 +30,7 @@ func NewTracesRouter(cm map[component.ID]consumer.Traces) TracesRouterAndConsume } return &tracesRouter{ Traces: fanoutconsumer.NewTraces(consumers), - baseRouter: newBaseRouter(fanoutconsumer.NewTraces, cm), + BaseRouter: internal.NewBaseRouter(fanoutconsumer.NewTraces, cm), } }