Skip to content

Commit

Permalink
Add observer support to receiver_creator (#173)
Browse files Browse the repository at this point in the history
* Add observer notification interface (k8s observer will be in separate PR)
* Refactor receiver_creator to be more easily testable and organized
  * receiver.go mostly implements OT interface and delegates to the new files
  * observerhandler.go responds to observer events and manages the starting/stopping of receivers
  * rules.go implements rules evaluation (not currently implemented)
  * runner.go contains a runner interface that handles the details of how to start and stop a receiver instance
that the observer handler wants to start/stop
* Implement basic add/remove/change response in receiver_creator to observer events
  • Loading branch information
jrcamp authored and wyTrivail committed Jul 13, 2020
1 parent b18c1ca commit 23fe25c
Show file tree
Hide file tree
Showing 19 changed files with 759 additions and 157 deletions.
1 change: 1 addition & 0 deletions extension/observer/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
3 changes: 3 additions & 0 deletions extension/observer/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer

go 1.14
122 changes: 122 additions & 0 deletions extension/observer/observer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package observer

import (
"fmt"
)

// Protocol defines network protocol for container ports.
type Protocol string

const (
// ProtocolTCP is the TCP protocol.
ProtocolTCP Protocol = "TCP"
// ProtocolUDP is the UDP protocol.
ProtocolUDP Protocol = "UDP"
)

// Endpoint is a service that can be contacted remotely.
type Endpoint interface {
// ID uniquely identifies this endpoint.
ID() string
// Target is an IP address or hostname of the endpoint.
Target() string
// String pretty formats the endpoint.
String() string
// Labels is a map of arbitrary metadata.
Labels() map[string]string
}

// endpointBase is common endpoint data used across all endpoint types.
type endpointBase struct {
id string
target string
labels map[string]string
}

func (e *endpointBase) ID() string {
return e.id
}

func (e *endpointBase) Target() string {
return e.target
}

func (e *endpointBase) Labels() map[string]string {
return e.labels
}

// HostEndpoint is an endpoint that just has a target but no identifying port information.
type HostEndpoint struct {
endpointBase
}

func (h *HostEndpoint) String() string {
return fmt.Sprintf("HostEndpoint{id: %v, Target: %v, Labels: %v}", h.ID(), h.target, h.labels)
}

// NewHostEndpoint creates a HostEndpoint
func NewHostEndpoint(id string, target string, labels map[string]string) *HostEndpoint {
return &HostEndpoint{endpointBase{
id: id,
target: target,
labels: labels,
}}
}

var _ Endpoint = (*HostEndpoint)(nil)

// PortEndpoint is an endpoint that has a target as well as a port.
type PortEndpoint struct {
endpointBase
Port uint16
}

func (p *PortEndpoint) String() string {
return fmt.Sprintf("PortEndpoint{ID: %v, Target: %v, Port: %d, Labels: %v}", p.ID(), p.target, p.Port, p.labels)
}

// NewPortEndpoint creates a PortEndpoint.
func NewPortEndpoint(id string, target string, port uint16, labels map[string]string) *PortEndpoint {
return &PortEndpoint{endpointBase: endpointBase{
id: id,
target: target,
labels: labels,
}, Port: port}
}

var _ Endpoint = (*PortEndpoint)(nil)

// Observable is an interface that provides notification of endpoint changes.
type Observable interface {
// TODO: Stopping.
// ListAndWatch provides initial state sync as well as change notification.
// notify.OnAdd will be called one or more times if there are endpoints discovered.
// (It would not be called if there are no endpoints present.) The endpoint synchronization
// happens asynchronously to this call.
ListAndWatch(notify Notify)
}

// Notify is the callback for Observer events.
type Notify interface {
// OnAdd is called once or more initially for state sync as well as when further endpoints are added.
OnAdd(added []Endpoint)
// OnRemove is called when one or more endpoints are removed.
OnRemove(removed []Endpoint)
// OnChange is called when one ore more endpoints are modified but the identity is not changed
// (e.g. labels).
OnChange(changed []Endpoint)
}
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,7 @@ github.com/honeycombio/opentelemetry-exporter-go v0.3.1 h1:oQZwILb+oEyQ45Sa+YGTr
github.com/honeycombio/opentelemetry-exporter-go v0.3.1/go.mod h1:enUrMJYC617rm8rZjKSEGapNSSiUctgIVBFKFSWGGiM=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/imdario/mergo v0.3.5 h1:JboBksRwiiAJWvIYJVo46AfV+IAIKZpfrSzVKj42R4Q=
github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
Expand Down
45 changes: 26 additions & 19 deletions receiver/receivercreator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,41 +19,48 @@ import (
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
)

// receiverConfig describes a receiver instance with a default config.
type receiverConfig struct {
// fullName is the full subreceiver name (ie <receiver type>/<id>).
fullName string
// typeStr is set based on the configured receiver name.
typeStr string
// config is the map configured by the user in the config file. It is the contents of the map from
// the "config" section. The keys and values are arbitrarily configured by the user.
config userConfigMap
}

// userConfigMap is an arbitrary map of string keys to arbitrary values as specified by the user
type userConfigMap map[string]interface{}

// subreceiverConfig is the configuration of a single subreceiver configured inside
// receiver_creator.
type subreceiverConfig struct {
// receiverTemplate is the configuration of a single subreceiver.
type receiverTemplate struct {
receiverConfig

// Rule is the discovery rule that when matched will create a receiver instance
// based on subreceiverConfig.
// based on receiverTemplate.
Rule string `mapstructure:"rule"`
// receiverType is set based on the configured receiver name.
receiverType string
// config is the map configured by the user in the config file. It is the contents of the map from
// the "config" section. The keys and values are arbitrarily configured by the user.
config userConfigMap
// fullName is the full subreceiver name (ie <receiver type>/<id>).
fullName string
}

// newSubreceiverConfig creates a subreceiverConfig instance from the full name of a subreceiver
// newReceiverTemplate creates a receiverTemplate instance from the full name of a subreceiver
// and its arbitrary config map values.
func newSubreceiverConfig(name string, config userConfigMap) (*subreceiverConfig, error) {
func newReceiverTemplate(name string, config userConfigMap) (receiverTemplate, error) {
typeStr, fullName, err := otelconfig.DecodeTypeAndName(name)
if err != nil {
return nil, err
return receiverTemplate{}, err
}

return &subreceiverConfig{
receiverType: typeStr,
fullName: fullName,
config: config,
return receiverTemplate{
receiverConfig: receiverConfig{
typeStr: typeStr,
fullName: fullName,
config: config,
},
}, nil
}

// Config defines configuration for receiver_creator.
type Config struct {
configmodels.ReceiverSettings `mapstructure:",squash"`
subreceiverConfigs map[string]*subreceiverConfig
receiverTemplates map[string]receiverTemplate
}
10 changes: 5 additions & 5 deletions receiver/receivercreator/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ func TestLoadConfig(t *testing.T) {
r1 := cfg.Receivers["receiver_creator/1"].(*Config)

assert.NotNil(t, r1)
assert.Len(t, r1.subreceiverConfigs, 1)
assert.Contains(t, r1.subreceiverConfigs, "examplereceiver/1")
assert.Equal(t, "test rule", r1.subreceiverConfigs["examplereceiver/1"].Rule)
assert.Len(t, r1.receiverTemplates, 1)
assert.Contains(t, r1.receiverTemplates, "examplereceiver/1")
assert.Equal(t, "enabled", r1.receiverTemplates["examplereceiver/1"].Rule)
assert.Equal(t, userConfigMap{
"endpoint": "localhost:12345",
}, r1.subreceiverConfigs["examplereceiver/1"].config)
endpointConfigKey: "localhost:12345",
}, r1.receiverTemplates["examplereceiver/1"].config)
}
8 changes: 4 additions & 4 deletions receiver/receivercreator/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (f *Factory) CustomUnmarshaler() component.CustomUnmarshaler {
// TODO: Change the sub-receivers to be under "receivers" to allow other config for the main receiver-creator receiver.
for subreceiverKey := range sourceViperSection.AllSettings() {
cfgSection := sourceViperSection.GetStringMap(subreceiverKey + "::config")
subreceiver, err := newSubreceiverConfig(subreceiverKey, cfgSection)
subreceiver, err := newReceiverTemplate(subreceiverKey, cfgSection)
if err != nil {
return err
}
Expand All @@ -66,7 +66,7 @@ func (f *Factory) CustomUnmarshaler() component.CustomUnmarshaler {
return fmt.Errorf("failed to deserialize sub-receiver %q: %s", subreceiverKey, err)
}

c.subreceiverConfigs[subreceiverKey] = subreceiver
c.receiverTemplates[subreceiverKey] = subreceiver
}

return nil
Expand All @@ -80,7 +80,7 @@ func (f *Factory) CreateDefaultConfig() configmodels.Receiver {
TypeVal: typeStr,
NameVal: typeStr,
},
subreceiverConfigs: map[string]*subreceiverConfig{},
receiverTemplates: map[string]receiverTemplate{},
}
}

Expand All @@ -96,5 +96,5 @@ func (f *Factory) CreateMetricsReceiver(
cfg configmodels.Receiver,
consumer consumer.MetricsConsumerOld,
) (component.MetricsReceiver, error) {
return new(logger, consumer, cfg.(*Config))
return newReceiverCreator(logger, consumer, cfg.(*Config))
}
4 changes: 2 additions & 2 deletions receiver/receivercreator/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ go 1.14
require (
github.com/census-instrumentation/opencensus-proto v0.2.1
github.com/open-telemetry/opentelemetry-collector v0.3.1-0.20200414190247-75ae9198a89e
github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer v0.0.0
github.com/spf13/viper v1.6.2
github.com/stretchr/testify v1.4.0
go.uber.org/zap v1.13.0
)

// Same version as from go.mod. Required to make go list -m work.
replace k8s.io/client-go => k8s.io/client-go v0.0.0-20190620085101-78d2af792bab
replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer => ../../extension/observer
2 changes: 2 additions & 0 deletions receiver/receivercreator/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ contrib.go.opencensus.io/exporter/jaeger v0.1.1-0.20190430175949-e8b55949d948 h1
contrib.go.opencensus.io/exporter/jaeger v0.1.1-0.20190430175949-e8b55949d948/go.mod h1:ukdzwIYYHgZ7QYtwVFQUjiT28BJHiMhTERo32s6qVgM=
contrib.go.opencensus.io/exporter/ocagent v0.6.0 h1:Z1n6UAyr0QwM284yUuh5Zd8JlvxUGAhFZcgMJkMPrGM=
contrib.go.opencensus.io/exporter/ocagent v0.6.0/go.mod h1:zmKjrJcdo0aYcVS7bmEeSEBLPA9YJp5bjrofdU3pIXs=
contrib.go.opencensus.io/exporter/prometheus v0.1.0 h1:SByaIoWwNgMdPSgl5sMqM2KDE5H/ukPWBRo314xiDvg=
contrib.go.opencensus.io/exporter/prometheus v0.1.0/go.mod h1:cGFniUXGZlKRjzOyuZJ6mgB+PgBcCIa79kEKR8YCW+A=
contrib.go.opencensus.io/resource v0.1.2 h1:b4WFJV8u7/NzPWHeTqj3Ec2AW8OGhtJxC/hbphIOvbU=
contrib.go.opencensus.io/resource v0.1.2/go.mod h1:F361eGI91LCmW1I/Saf+rX0+OFcigGlFvXwEGEnkRLA=
Expand Down Expand Up @@ -610,6 +611,7 @@ github.com/hashicorp/serf v0.8.3 h1:MWYcmct5EtKz0efYooPcL0yNkem+7kWxqXDi/UIh+8k=
github.com/hashicorp/serf v0.8.3/go.mod h1:UpNcs7fFbpKIyZaUuSW6EPiH+eZC7OuyFD+wc1oal+k=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/imdario/mergo v0.3.5 h1:JboBksRwiiAJWvIYJVo46AfV+IAIKZpfrSzVKj42R4Q=
github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
Expand Down
107 changes: 107 additions & 0 deletions receiver/receivercreator/observerhandler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package receivercreator

import (
"fmt"
"sync"

"github.com/open-telemetry/opentelemetry-collector/component/componenterror"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer"
)

var _ observer.Notify = (*observerHandler)(nil)

// observerHandler manages endpoint change notifications.
type observerHandler struct {
sync.Mutex
logger *zap.Logger
// receiverTemplates maps receiver template full name to a receiverTemplate value.
receiverTemplates map[string]receiverTemplate
// receiversByEndpointID is a map of endpoint IDs to a receiver instance.
receiversByEndpointID receiverMap
// runner starts and stops receiver instances.
runner runner
}

// Shutdown all receivers started at runtime.
func (obs *observerHandler) Shutdown() error {
obs.Lock()
defer obs.Unlock()

var errs []error

for _, rcvr := range obs.receiversByEndpointID.Values() {
if err := obs.runner.shutdown(rcvr); err != nil {
// TODO: Should keep track of which receiver the error is associated with
// but require some restructuring.
errs = append(errs, err)
}
}

if len(errs) > 0 {
return fmt.Errorf("shutdown on %d receivers failed: %v", len(errs), componenterror.CombineErrors(errs))
}

return nil
}

// OnAdd responds to endpoint add notifications.
func (obs *observerHandler) OnAdd(added []observer.Endpoint) {
obs.Lock()
defer obs.Unlock()

for _, e := range added {
for _, template := range obs.receiverTemplates {
if !ruleMatches(template.Rule, e) {
continue
}
rcvr, err := obs.runner.start(template.receiverConfig, userConfigMap{
endpointConfigKey: e.Target(),
})
if err != nil {
obs.logger.Error("failed to start receiver", zap.String("receiver", template.fullName))
continue
}

obs.receiversByEndpointID.Put(e.ID(), rcvr)
}
}
}

// OnRemove responds to endpoint removal notifications.
func (obs *observerHandler) OnRemove(removed []observer.Endpoint) {
obs.Lock()
defer obs.Unlock()

for _, e := range removed {
for _, rcvr := range obs.receiversByEndpointID.Get(e.ID()) {
if err := obs.runner.shutdown(rcvr); err != nil {
obs.logger.Error("failed to stop receiver", zap.Reflect("receiver", rcvr))
continue
}
}
obs.receiversByEndpointID.RemoveAll(e.ID())
}
}

// OnChange responds to endpoint change notifications.
func (obs *observerHandler) OnChange(changed []observer.Endpoint) {
// TODO: optimize to only restart if effective config has changed.
obs.OnRemove(changed)
obs.OnAdd(changed)
}
Loading

0 comments on commit 23fe25c

Please sign in to comment.