diff --git a/receiver/receivercreator/config.go b/receiver/receivercreator/config.go index 814011d293f0..a823290380d6 100644 --- a/receiver/receivercreator/config.go +++ b/receiver/receivercreator/config.go @@ -86,7 +86,7 @@ type Config struct { config.ReceiverSettings `mapstructure:",squash"` receiverTemplates map[string]receiverTemplate // WatchObservers are the extensions to listen to endpoints from. - WatchObservers []config.Type `mapstructure:"watch_observers"` + WatchObservers []config.ComponentID `mapstructure:"watch_observers"` // ResourceAttributes is a map of default resource attributes to add to each resource // object received by this receiver from dynamically created receivers. ResourceAttributes resourceAttributes `mapstructure:"resource_attributes"` diff --git a/receiver/receivercreator/config_test.go b/receiver/receivercreator/config_test.go index ac563fd3f3ce..2271a8491258 100644 --- a/receiver/receivercreator/config_test.go +++ b/receiver/receivercreator/config_test.go @@ -90,7 +90,10 @@ func TestLoadConfig(t *testing.T) { assert.Equal(t, userConfigMap{ endpointConfigKey: "localhost:12345", }, r1.receiverTemplates["nop/1"].config) - assert.Equal(t, []config.Type{"mock_observer"}, r1.WatchObservers) + assert.Equal(t, []config.ComponentID{ + config.NewComponentID("mock_observer"), + config.NewComponentIDWithName("mock_observer", "with_name"), + }, r1.WatchObservers) } func TestInvalidResourceAttributeEndpointType(t *testing.T) { diff --git a/receiver/receivercreator/receiver.go b/receiver/receivercreator/receiver.go index 194c8841357b..19d73d66f9d8 100644 --- a/receiver/receivercreator/receiver.go +++ b/receiver/receivercreator/receiver.go @@ -78,18 +78,18 @@ func (rc *receiverCreator) Start(_ context.Context, host component.Host) error { }, } - observers := map[config.Type]observer.Observable{} + observers := map[config.ComponentID]observer.Observable{} // Match all configured observables to the extensions that are running. for _, watchObserver := range rc.cfg.WatchObservers { - for cfg, ext := range host.GetExtensions() { - if cfg.Type() != watchObserver { + for cid, ext := range host.GetExtensions() { + if cid != watchObserver { continue } obs, ok := ext.(observer.Observable) if !ok { - return fmt.Errorf("extension %q in watch_observers is not an observer", watchObserver) + return fmt.Errorf("extension %q in watch_observers is not an observer", watchObserver.String()) } observers[watchObserver] = obs } @@ -98,7 +98,7 @@ func (rc *receiverCreator) Start(_ context.Context, host component.Host) error { // Make sure all observables are present before starting any. for _, watchObserver := range rc.cfg.WatchObservers { if observers[watchObserver] == nil { - return fmt.Errorf("failed to find observer %q in the extensions list", watchObserver) + return fmt.Errorf("failed to find observer %q in the extensions list", watchObserver.String()) } } diff --git a/receiver/receivercreator/receiver_test.go b/receiver/receivercreator/receiver_test.go index 15c0b8145799..86bbdff66a2e 100644 --- a/receiver/receivercreator/receiver_test.go +++ b/receiver/receivercreator/receiver_test.go @@ -69,7 +69,8 @@ var _ observer.Observable = (*mockObserver)(nil) func TestMockedEndToEnd(t *testing.T) { host, cfg := exampleCreatorFactory(t) host.extensions = map[config.ComponentID]component.Extension{ - config.NewComponentID("mock_observer"): &mockObserver{}, + config.NewComponentID("mock_observer"): &mockObserver{}, + config.NewComponentIDWithName("mock_observer", "with_name"): &mockObserver{}, } dynCfg := cfg.Receivers[config.NewComponentIDWithName(typeStr, "1")] factory := NewFactory() @@ -90,8 +91,8 @@ func TestMockedEndToEnd(t *testing.T) { defer shutdown() require.Eventuallyf(t, func() bool { - return dyn.observerHandler.receiversByEndpointID.Size() == 1 - }, 1*time.Second, 100*time.Millisecond, "expected 1 receiver but got %v", dyn.observerHandler.receiversByEndpointID) + return dyn.observerHandler.receiversByEndpointID.Size() == 2 + }, 1*time.Second, 100*time.Millisecond, "expected 2 receiver but got %v", dyn.observerHandler.receiversByEndpointID) // Test that we can send metrics. for _, receiver := range dyn.observerHandler.receiversByEndpointID.Values() { @@ -126,7 +127,7 @@ func TestMockedEndToEnd(t *testing.T) { } // TODO: Will have to rework once receivers are started asynchronously to Start(). - assert.Len(t, mockConsumer.AllMetrics(), 1) + assert.Len(t, mockConsumer.AllMetrics(), 2) } func TestLoggingHost(t *testing.T) { diff --git a/receiver/receivercreator/testdata/config.yaml b/receiver/receivercreator/testdata/config.yaml index 0b663af41764..ae69af596243 100644 --- a/receiver/receivercreator/testdata/config.yaml +++ b/receiver/receivercreator/testdata/config.yaml @@ -1,7 +1,9 @@ receivers: receiver_creator: receiver_creator/1: - watch_observers: [mock_observer] + watch_observers: + - mock_observer + - mock_observer/with_name receivers: examplereceiver/1: rule: type == "port" diff --git a/unreleased/receiver-creator-obs-cid.yaml b/unreleased/receiver-creator-obs-cid.yaml new file mode 100755 index 000000000000..aefa50b6bd4a --- /dev/null +++ b/unreleased/receiver-creator-obs-cid.yaml @@ -0,0 +1,4 @@ +change_type: bug_fix +component: receiver_creator +note: Correct observer instance reference by updating watch_observers use ComponentID instead of just Type +issues: [12801]