Skip to content

Commit

Permalink
[receiver/receiver_creator] correct watch_observers references to ext…
Browse files Browse the repository at this point in the history
…ension instances (#12801)

[receiver/receiver_creator] remove  per-observer type limitation
  • Loading branch information
rmfitzpatrick authored Aug 19, 2022
1 parent 8a821bb commit 0da606e
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 12 deletions.
2 changes: 1 addition & 1 deletion receiver/receivercreator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ type Config struct {
config.ReceiverSettings `mapstructure:",squash"`
receiverTemplates map[string]receiverTemplate
// WatchObservers are the extensions to listen to endpoints from.
WatchObservers []config.Type `mapstructure:"watch_observers"`
WatchObservers []config.ComponentID `mapstructure:"watch_observers"`
// ResourceAttributes is a map of default resource attributes to add to each resource
// object received by this receiver from dynamically created receivers.
ResourceAttributes resourceAttributes `mapstructure:"resource_attributes"`
Expand Down
5 changes: 4 additions & 1 deletion receiver/receivercreator/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ func TestLoadConfig(t *testing.T) {
assert.Equal(t, userConfigMap{
endpointConfigKey: "localhost:12345",
}, r1.receiverTemplates["nop/1"].config)
assert.Equal(t, []config.Type{"mock_observer"}, r1.WatchObservers)
assert.Equal(t, []config.ComponentID{
config.NewComponentID("mock_observer"),
config.NewComponentIDWithName("mock_observer", "with_name"),
}, r1.WatchObservers)
}

func TestInvalidResourceAttributeEndpointType(t *testing.T) {
Expand Down
10 changes: 5 additions & 5 deletions receiver/receivercreator/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,18 +78,18 @@ func (rc *receiverCreator) Start(_ context.Context, host component.Host) error {
},
}

observers := map[config.Type]observer.Observable{}
observers := map[config.ComponentID]observer.Observable{}

// Match all configured observables to the extensions that are running.
for _, watchObserver := range rc.cfg.WatchObservers {
for cfg, ext := range host.GetExtensions() {
if cfg.Type() != watchObserver {
for cid, ext := range host.GetExtensions() {
if cid != watchObserver {
continue
}

obs, ok := ext.(observer.Observable)
if !ok {
return fmt.Errorf("extension %q in watch_observers is not an observer", watchObserver)
return fmt.Errorf("extension %q in watch_observers is not an observer", watchObserver.String())
}
observers[watchObserver] = obs
}
Expand All @@ -98,7 +98,7 @@ func (rc *receiverCreator) Start(_ context.Context, host component.Host) error {
// Make sure all observables are present before starting any.
for _, watchObserver := range rc.cfg.WatchObservers {
if observers[watchObserver] == nil {
return fmt.Errorf("failed to find observer %q in the extensions list", watchObserver)
return fmt.Errorf("failed to find observer %q in the extensions list", watchObserver.String())
}
}

Expand Down
9 changes: 5 additions & 4 deletions receiver/receivercreator/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ var _ observer.Observable = (*mockObserver)(nil)
func TestMockedEndToEnd(t *testing.T) {
host, cfg := exampleCreatorFactory(t)
host.extensions = map[config.ComponentID]component.Extension{
config.NewComponentID("mock_observer"): &mockObserver{},
config.NewComponentID("mock_observer"): &mockObserver{},
config.NewComponentIDWithName("mock_observer", "with_name"): &mockObserver{},
}
dynCfg := cfg.Receivers[config.NewComponentIDWithName(typeStr, "1")]
factory := NewFactory()
Expand All @@ -90,8 +91,8 @@ func TestMockedEndToEnd(t *testing.T) {
defer shutdown()

require.Eventuallyf(t, func() bool {
return dyn.observerHandler.receiversByEndpointID.Size() == 1
}, 1*time.Second, 100*time.Millisecond, "expected 1 receiver but got %v", dyn.observerHandler.receiversByEndpointID)
return dyn.observerHandler.receiversByEndpointID.Size() == 2
}, 1*time.Second, 100*time.Millisecond, "expected 2 receiver but got %v", dyn.observerHandler.receiversByEndpointID)

// Test that we can send metrics.
for _, receiver := range dyn.observerHandler.receiversByEndpointID.Values() {
Expand Down Expand Up @@ -126,7 +127,7 @@ func TestMockedEndToEnd(t *testing.T) {
}

// TODO: Will have to rework once receivers are started asynchronously to Start().
assert.Len(t, mockConsumer.AllMetrics(), 1)
assert.Len(t, mockConsumer.AllMetrics(), 2)
}

func TestLoggingHost(t *testing.T) {
Expand Down
4 changes: 3 additions & 1 deletion receiver/receivercreator/testdata/config.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
receivers:
receiver_creator:
receiver_creator/1:
watch_observers: [mock_observer]
watch_observers:
- mock_observer
- mock_observer/with_name
receivers:
examplereceiver/1:
rule: type == "port"
Expand Down
4 changes: 4 additions & 0 deletions unreleased/receiver-creator-obs-cid.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
change_type: bug_fix
component: receiver_creator
note: Correct observer instance reference by updating watch_observers use ComponentID instead of just Type
issues: [12801]

0 comments on commit 0da606e

Please sign in to comment.