Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add observer support to receiver_creator #173

Merged
merged 6 commits into from
Apr 17, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions extension/observer/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
3 changes: 3 additions & 0 deletions extension/observer/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer

go 1.14
122 changes: 122 additions & 0 deletions extension/observer/observer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package observer

import (
"fmt"
)

// Protocol defines network protocol for container ports.
type Protocol string

const (
// ProtocolTCP is the TCP protocol.
ProtocolTCP Protocol = "TCP"
// ProtocolUDP is the UDP protocol.
ProtocolUDP Protocol = "UDP"
)

// Endpoint is a service that can be contacted remotely.
type Endpoint interface {
// ID uniquely identifies this endpoint.
ID() string
// Target is an IP address or hostname of the endpoint.
Target() string
// String pretty formats the endpoint.
String() string
// Labels is a map of arbitrary metadata.
tigrannajaryan marked this conversation as resolved.
Show resolved Hide resolved
Labels() map[string]string
}

// endpointBase is common endpoint data used across all endpoint types.
type endpointBase struct {
id string
target string
labels map[string]string
}

func (e *endpointBase) ID() string {
return e.id
}

func (e *endpointBase) Target() string {
return e.target
}

func (e *endpointBase) Labels() map[string]string {
return e.labels
}

// HostEndpoint is an endpoint that just has a target but no identifying port information.
type HostEndpoint struct {
endpointBase
}

func (h *HostEndpoint) String() string {
return fmt.Sprintf("HostEndpoint{id: %v, Target: %v, Labels: %v}", h.ID(), h.target, h.labels)
}

// NewHostEndpoint creates a HostEndpoint
func NewHostEndpoint(id string, target string, labels map[string]string) *HostEndpoint {
return &HostEndpoint{endpointBase{
id: id,
target: target,
labels: labels,
}}
}

var _ Endpoint = (*HostEndpoint)(nil)

// PortEndpoint is an endpoint that has a target as well as a port.
type PortEndpoint struct {
endpointBase
Port uint16
}

func (p *PortEndpoint) String() string {
return fmt.Sprintf("PortEndpoint{ID: %v, Target: %v, Port: %d, Labels: %v}", p.ID(), p.target, p.Port, p.labels)
}

// NewPortEndpoint creates a PortEndpoint.
func NewPortEndpoint(id string, target string, port uint16, labels map[string]string) *PortEndpoint {
return &PortEndpoint{endpointBase: endpointBase{
id: id,
target: target,
labels: labels,
}, Port: port}
}

var _ Endpoint = (*PortEndpoint)(nil)

// Observable is an interface that provides notification of endpoint changes.
type Observable interface {
// TODO: Stopping.
// ListAndWatch provides initial state sync as well as change notification.
tigrannajaryan marked this conversation as resolved.
Show resolved Hide resolved
// notify.OnAdd will be called one or more times if there are endpoints discovered.
// (It would not be called if there are no endpoints present.) The endpoint synchronization
// happens asynchronously to this call.
ListAndWatch(notify Notify)
}

// Notify is the callback for Observer events.
type Notify interface {
// OnAdd is called once or more initially for state sync as well as when further endpoints are added.
OnAdd(added []Endpoint)
// OnRemove is called when one or more endpoints are removed.
OnRemove(removed []Endpoint)
// OnChange is called when one ore more endpoints are modified but the identity is not changed
// (e.g. labels).
OnChange(changed []Endpoint)
tigrannajaryan marked this conversation as resolved.
Show resolved Hide resolved
}
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,7 @@ github.com/honeycombio/opentelemetry-exporter-go v0.3.1 h1:oQZwILb+oEyQ45Sa+YGTr
github.com/honeycombio/opentelemetry-exporter-go v0.3.1/go.mod h1:enUrMJYC617rm8rZjKSEGapNSSiUctgIVBFKFSWGGiM=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/imdario/mergo v0.3.5 h1:JboBksRwiiAJWvIYJVo46AfV+IAIKZpfrSzVKj42R4Q=
github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
Expand Down
45 changes: 26 additions & 19 deletions receiver/receivercreator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,41 +19,48 @@ import (
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
)

// receiverConfig describes a receiver instance with a default config.
type receiverConfig struct {
// fullName is the full subreceiver name (ie <receiver type>/<id>).
fullName string
// typeStr is set based on the configured receiver name.
typeStr string
// config is the map configured by the user in the config file. It is the contents of the map from
// the "config" section. The keys and values are arbitrarily configured by the user.
config userConfigMap
}

// userConfigMap is an arbitrary map of string keys to arbitrary values as specified by the user
type userConfigMap map[string]interface{}

// subreceiverConfig is the configuration of a single subreceiver configured inside
// receiver_creator.
type subreceiverConfig struct {
// receiverTemplate is the configuration of a single subreceiver.
type receiverTemplate struct {
receiverConfig

// Rule is the discovery rule that when matched will create a receiver instance
// based on subreceiverConfig.
// based on receiverTemplate.
Rule string `mapstructure:"rule"`
// receiverType is set based on the configured receiver name.
receiverType string
// config is the map configured by the user in the config file. It is the contents of the map from
// the "config" section. The keys and values are arbitrarily configured by the user.
config userConfigMap
// fullName is the full subreceiver name (ie <receiver type>/<id>).
fullName string
}

// newSubreceiverConfig creates a subreceiverConfig instance from the full name of a subreceiver
// newReceiverTemplate creates a receiverTemplate instance from the full name of a subreceiver
// and its arbitrary config map values.
func newSubreceiverConfig(name string, config userConfigMap) (*subreceiverConfig, error) {
func newReceiverTemplate(name string, config userConfigMap) (receiverTemplate, error) {
typeStr, fullName, err := otelconfig.DecodeTypeAndName(name)
if err != nil {
return nil, err
return receiverTemplate{}, err
}

return &subreceiverConfig{
receiverType: typeStr,
fullName: fullName,
config: config,
return receiverTemplate{
receiverConfig: receiverConfig{
typeStr: typeStr,
fullName: fullName,
config: config,
},
}, nil
}

// Config defines configuration for receiver_creator.
type Config struct {
configmodels.ReceiverSettings `mapstructure:",squash"`
subreceiverConfigs map[string]*subreceiverConfig
receiverTemplates map[string]receiverTemplate
}
10 changes: 5 additions & 5 deletions receiver/receivercreator/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ func TestLoadConfig(t *testing.T) {
r1 := cfg.Receivers["receiver_creator/1"].(*Config)

assert.NotNil(t, r1)
assert.Len(t, r1.subreceiverConfigs, 1)
assert.Contains(t, r1.subreceiverConfigs, "examplereceiver/1")
assert.Equal(t, "test rule", r1.subreceiverConfigs["examplereceiver/1"].Rule)
assert.Len(t, r1.receiverTemplates, 1)
assert.Contains(t, r1.receiverTemplates, "examplereceiver/1")
assert.Equal(t, "enabled", r1.receiverTemplates["examplereceiver/1"].Rule)
assert.Equal(t, userConfigMap{
"endpoint": "localhost:12345",
}, r1.subreceiverConfigs["examplereceiver/1"].config)
endpointConfigKey: "localhost:12345",
}, r1.receiverTemplates["examplereceiver/1"].config)
}
8 changes: 4 additions & 4 deletions receiver/receivercreator/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (f *Factory) CustomUnmarshaler() component.CustomUnmarshaler {
// TODO: Change the sub-receivers to be under "receivers" to allow other config for the main receiver-creator receiver.
for subreceiverKey := range sourceViperSection.AllSettings() {
cfgSection := sourceViperSection.GetStringMap(subreceiverKey + "::config")
subreceiver, err := newSubreceiverConfig(subreceiverKey, cfgSection)
subreceiver, err := newReceiverTemplate(subreceiverKey, cfgSection)
if err != nil {
return err
}
Expand All @@ -66,7 +66,7 @@ func (f *Factory) CustomUnmarshaler() component.CustomUnmarshaler {
return fmt.Errorf("failed to deserialize sub-receiver %q: %s", subreceiverKey, err)
}

c.subreceiverConfigs[subreceiverKey] = subreceiver
c.receiverTemplates[subreceiverKey] = subreceiver
}

return nil
Expand All @@ -80,7 +80,7 @@ func (f *Factory) CreateDefaultConfig() configmodels.Receiver {
TypeVal: typeStr,
NameVal: typeStr,
},
subreceiverConfigs: map[string]*subreceiverConfig{},
receiverTemplates: map[string]receiverTemplate{},
}
}

Expand All @@ -96,5 +96,5 @@ func (f *Factory) CreateMetricsReceiver(
cfg configmodels.Receiver,
consumer consumer.MetricsConsumerOld,
) (component.MetricsReceiver, error) {
return new(logger, consumer, cfg.(*Config))
return newReceiverCreator(logger, consumer, cfg.(*Config))
}
4 changes: 2 additions & 2 deletions receiver/receivercreator/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ go 1.14
require (
github.com/census-instrumentation/opencensus-proto v0.2.1
github.com/open-telemetry/opentelemetry-collector v0.3.1-0.20200414190247-75ae9198a89e
github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer v0.0.0
github.com/spf13/viper v1.6.2
github.com/stretchr/testify v1.4.0
go.uber.org/zap v1.13.0
)

// Same version as from go.mod. Required to make go list -m work.
replace k8s.io/client-go => k8s.io/client-go v0.0.0-20190620085101-78d2af792bab
replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer => ../../extension/observer
2 changes: 2 additions & 0 deletions receiver/receivercreator/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ contrib.go.opencensus.io/exporter/jaeger v0.1.1-0.20190430175949-e8b55949d948 h1
contrib.go.opencensus.io/exporter/jaeger v0.1.1-0.20190430175949-e8b55949d948/go.mod h1:ukdzwIYYHgZ7QYtwVFQUjiT28BJHiMhTERo32s6qVgM=
contrib.go.opencensus.io/exporter/ocagent v0.6.0 h1:Z1n6UAyr0QwM284yUuh5Zd8JlvxUGAhFZcgMJkMPrGM=
contrib.go.opencensus.io/exporter/ocagent v0.6.0/go.mod h1:zmKjrJcdo0aYcVS7bmEeSEBLPA9YJp5bjrofdU3pIXs=
contrib.go.opencensus.io/exporter/prometheus v0.1.0 h1:SByaIoWwNgMdPSgl5sMqM2KDE5H/ukPWBRo314xiDvg=
contrib.go.opencensus.io/exporter/prometheus v0.1.0/go.mod h1:cGFniUXGZlKRjzOyuZJ6mgB+PgBcCIa79kEKR8YCW+A=
contrib.go.opencensus.io/resource v0.1.2 h1:b4WFJV8u7/NzPWHeTqj3Ec2AW8OGhtJxC/hbphIOvbU=
contrib.go.opencensus.io/resource v0.1.2/go.mod h1:F361eGI91LCmW1I/Saf+rX0+OFcigGlFvXwEGEnkRLA=
Expand Down Expand Up @@ -610,6 +611,7 @@ github.com/hashicorp/serf v0.8.3 h1:MWYcmct5EtKz0efYooPcL0yNkem+7kWxqXDi/UIh+8k=
github.com/hashicorp/serf v0.8.3/go.mod h1:UpNcs7fFbpKIyZaUuSW6EPiH+eZC7OuyFD+wc1oal+k=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/imdario/mergo v0.3.5 h1:JboBksRwiiAJWvIYJVo46AfV+IAIKZpfrSzVKj42R4Q=
github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
Expand Down
107 changes: 107 additions & 0 deletions receiver/receivercreator/observerhandler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package receivercreator

import (
"fmt"
"sync"

"github.com/open-telemetry/opentelemetry-collector/component/componenterror"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer"
)

var _ observer.Notify = (*observerHandler)(nil)

// observerHandler manages endpoint change notifications.
type observerHandler struct {
sync.Mutex
logger *zap.Logger
// receiverTemplates maps receiver template full name to a receiverTemplate value.
receiverTemplates map[string]receiverTemplate
// receiversByEndpointID is a map of endpoint IDs to a receiver instance.
receiversByEndpointID receiverMap
// runner starts and stops receiver instances.
runner runner
}

// Shutdown all receivers started at runtime.
func (obs *observerHandler) Shutdown() error {
obs.Lock()
defer obs.Unlock()

var errs []error

for _, rcvr := range obs.receiversByEndpointID.Values() {
if err := obs.runner.shutdown(rcvr); err != nil {
// TODO: Should keep track of which receiver the error is associated with
// but require some restructuring.
errs = append(errs, err)
}
}

if len(errs) > 0 {
return fmt.Errorf("shutdown on %d receivers failed: %v", len(errs), componenterror.CombineErrors(errs))
}

return nil
}

// OnAdd responds to endpoint add notifications.
func (obs *observerHandler) OnAdd(added []observer.Endpoint) {
obs.Lock()
defer obs.Unlock()

for _, e := range added {
for _, template := range obs.receiverTemplates {
if !ruleMatches(template.Rule, e) {
continue
}
rcvr, err := obs.runner.start(template.receiverConfig, userConfigMap{
endpointConfigKey: e.Target(),
})
if err != nil {
obs.logger.Error("failed to start receiver", zap.String("receiver", template.fullName))
continue
}

obs.receiversByEndpointID.Put(e.ID(), rcvr)
}
}
}

// OnRemove responds to endpoint removal notifications.
func (obs *observerHandler) OnRemove(removed []observer.Endpoint) {
obs.Lock()
defer obs.Unlock()

for _, e := range removed {
for _, rcvr := range obs.receiversByEndpointID.Get(e.ID()) {
if err := obs.runner.shutdown(rcvr); err != nil {
obs.logger.Error("failed to stop receiver", zap.Reflect("receiver", rcvr))
continue
}
}
obs.receiversByEndpointID.RemoveAll(e.ID())
}
}

// OnChange responds to endpoint change notifications.
func (obs *observerHandler) OnChange(changed []observer.Endpoint) {
// TODO: optimize to only restart if effective config has changed.
obs.OnRemove(changed)
obs.OnAdd(changed)
}
Loading