Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[receiver/receiver_creator] correct watch_observers references to extension instances #12801

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion receiver/receivercreator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ type Config struct {
config.ReceiverSettings `mapstructure:",squash"`
receiverTemplates map[string]receiverTemplate
// WatchObservers are the extensions to listen to endpoints from.
WatchObservers []config.Type `mapstructure:"watch_observers"`
WatchObservers []config.ComponentID `mapstructure:"watch_observers"`
// ResourceAttributes is a map of default resource attributes to add to each resource
// object received by this receiver from dynamically created receivers.
ResourceAttributes resourceAttributes `mapstructure:"resource_attributes"`
Expand Down
5 changes: 4 additions & 1 deletion receiver/receivercreator/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ func TestLoadConfig(t *testing.T) {
assert.Equal(t, userConfigMap{
endpointConfigKey: "localhost:12345",
}, r1.receiverTemplates["nop/1"].config)
assert.Equal(t, []config.Type{"mock_observer"}, r1.WatchObservers)
assert.Equal(t, []config.ComponentID{
config.NewComponentID("mock_observer"),
config.NewComponentIDWithName("mock_observer", "with_name"),
}, r1.WatchObservers)
}

func TestInvalidResourceAttributeEndpointType(t *testing.T) {
Expand Down
10 changes: 5 additions & 5 deletions receiver/receivercreator/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,18 +78,18 @@ func (rc *receiverCreator) Start(_ context.Context, host component.Host) error {
},
}

observers := map[config.Type]observer.Observable{}
observers := map[config.ComponentID]observer.Observable{}

// Match all configured observables to the extensions that are running.
for _, watchObserver := range rc.cfg.WatchObservers {
for cfg, ext := range host.GetExtensions() {
if cfg.Type() != watchObserver {
for cid, ext := range host.GetExtensions() {
if cid != watchObserver {
continue
}

obs, ok := ext.(observer.Observable)
if !ok {
return fmt.Errorf("extension %q in watch_observers is not an observer", watchObserver)
return fmt.Errorf("extension %q in watch_observers is not an observer", watchObserver.String())
}
observers[watchObserver] = obs
}
Expand All @@ -98,7 +98,7 @@ func (rc *receiverCreator) Start(_ context.Context, host component.Host) error {
// Make sure all observables are present before starting any.
for _, watchObserver := range rc.cfg.WatchObservers {
if observers[watchObserver] == nil {
return fmt.Errorf("failed to find observer %q in the extensions list", watchObserver)
return fmt.Errorf("failed to find observer %q in the extensions list", watchObserver.String())
}
}

Expand Down
9 changes: 5 additions & 4 deletions receiver/receivercreator/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ var _ observer.Observable = (*mockObserver)(nil)
func TestMockedEndToEnd(t *testing.T) {
host, cfg := exampleCreatorFactory(t)
host.extensions = map[config.ComponentID]component.Extension{
config.NewComponentID("mock_observer"): &mockObserver{},
config.NewComponentID("mock_observer"): &mockObserver{},
config.NewComponentIDWithName("mock_observer", "with_name"): &mockObserver{},
}
dynCfg := cfg.Receivers[config.NewComponentIDWithName(typeStr, "1")]
factory := NewFactory()
Expand All @@ -90,8 +91,8 @@ func TestMockedEndToEnd(t *testing.T) {
defer shutdown()

require.Eventuallyf(t, func() bool {
return dyn.observerHandler.receiversByEndpointID.Size() == 1
}, 1*time.Second, 100*time.Millisecond, "expected 1 receiver but got %v", dyn.observerHandler.receiversByEndpointID)
return dyn.observerHandler.receiversByEndpointID.Size() == 2
}, 1*time.Second, 100*time.Millisecond, "expected 2 receiver but got %v", dyn.observerHandler.receiversByEndpointID)

// Test that we can send metrics.
for _, receiver := range dyn.observerHandler.receiversByEndpointID.Values() {
Expand Down Expand Up @@ -126,7 +127,7 @@ func TestMockedEndToEnd(t *testing.T) {
}

// TODO: Will have to rework once receivers are started asynchronously to Start().
assert.Len(t, mockConsumer.AllMetrics(), 1)
assert.Len(t, mockConsumer.AllMetrics(), 2)
}

func TestLoggingHost(t *testing.T) {
Expand Down
4 changes: 3 additions & 1 deletion receiver/receivercreator/testdata/config.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
receivers:
receiver_creator:
receiver_creator/1:
watch_observers: [mock_observer]
watch_observers:
- mock_observer
- mock_observer/with_name
receivers:
examplereceiver/1:
rule: type == "port"
Expand Down
4 changes: 4 additions & 0 deletions unreleased/receiver-creator-obs-cid.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
change_type: bug_fix
component: receiver_creator
note: Correct observer instance reference by updating watch_observers use ComponentID instead of just Type
issues: [12801]