Skip to content

Commit

Permalink
Initial discovery receiver skeleton
Browse files Browse the repository at this point in the history
  • Loading branch information
Ryan Fitzpatrick committed Aug 17, 2022
1 parent 4a2ba40 commit fd7f2c8
Show file tree
Hide file tree
Showing 13 changed files with 850 additions and 1 deletion.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/splunkhecexporter v0.58.0
github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckextension v0.58.0
github.com/open-telemetry/opentelemetry-collector-contrib/extension/httpforwarder v0.58.0
github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer v0.58.0
github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/dockerobserver v0.58.0
github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/ecsobserver v0.58.0
github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/ecstaskobserver v0.58.0
Expand Down Expand Up @@ -309,7 +310,6 @@ require (
github.com/observiq/ctimefmt v1.0.0 // indirect
github.com/oklog/run v1.1.0 // indirect
github.com/olekukonko/tablewriter v0.0.5 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer v0.58.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/ecsutil v0.58.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.58.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.58.0 // indirect
Expand Down
162 changes: 162 additions & 0 deletions internal/receiver/discoveryreceiver/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
// Copyright Splunk, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package discoveryreceiver

import (
"fmt"

"go.opentelemetry.io/collector/config"
"go.uber.org/multierr"
)

var (
_ config.Receiver = (*Config)(nil)

allowedStatusTypes = []string{"successful", "partial", "failed"}
allowedStatuses = func() map[string]struct{} {
sm := map[string]struct{}{}
for _, status := range allowedStatusTypes {
sm[status] = struct{}{}
}
return sm
}()

allowedMatchTypes = []string{"regexp", "strict", "expr"}
)

type Config struct {
// Receivers is a mapping of all receiver types to discovery and whose metrics
// and application statements to evaluate for assessing component status.
Receivers map[config.ComponentID]ReceiverEntry `mapstructure:"receivers"`
config.ReceiverSettings `mapstructure:",squash"`
// The configured Observer extensions from which to receive Endpoint events.
// Must implement the observer.Observable interface.
WatchObservers []config.ComponentID `mapstructure:"watch_observers"`
// Whether to emit log records for all endpoint activity, consisting of Endpoint
// content as record attributes.
LogEndpoints bool `mapstructure:"log_endpoints"`
// Whether to include the receiver config as a base64-encoded "discovery.receiver.config"
// resource attribute string value. Will also contain the configured observer that
// produced the endpoint leading to receiver creation.
// Warning: these values will include the literal receiver subconfig from the parent Collector config
// and provides no secret redaction and is easily decodable into plaintext.
EmbedReceiverConfig bool `mapstructure:"embed_receiver_config"`
}

// ReceiverEntry is a definition for a receiver instance to instantiate for each Endpoint matching
// the defined rule. Its Config, ResourceAttributes, and Rule will be marshaled to the internal
// Receiver Creator config.
type ReceiverEntry struct {
Config map[string]any `mapstructure:"config"`
Status *Status `mapstructure:"status"`
ResourceAttributes map[string]string `mapstructure:"resource_attributes"`
Rule string `mapstructure:"rule"`
}

// Status defines the Match rules for applicable app and telemetry sources.
// At this time only Metrics and zap logger Statements status source types are supported.
type Status struct {
Metrics map[string][]Match `mapstructure:"metrics"`
Statements map[string][]Match `mapstructure:"statements"`
}

// Match defines the rules for the desired match type and resulting log record
// content emitted by the Discovery receiver
type Match struct {
Record *LogRecord `mapstructure:"log_record"`
Strict string `mapstructure:"strict"`
Regexp string `mapstructure:"regexp"`
Expr string `mapstructure:"expr"`
FirstOnly bool `mapstructure:"first_only"`
}

// LogRecord is a definition of the desired plog.LogRecord content to emit for a match.
type LogRecord struct {
Attributes map[string]string `mapstructure:"attributes"`
Severity string `mapstructure:"severity"`
Body string `mapstructure:"body"`
}

func (cfg *Config) Validate() error {
var err error
for rName, rEntry := range cfg.Receivers {
if e := rEntry.validate(); e != nil {
err = multierr.Combine(err, fmt.Errorf("receiver %q validation failure: %w", rName, e))
}
}

if cfg.WatchObservers == nil || len(cfg.WatchObservers) == 0 {
err = multierr.Combine(err, fmt.Errorf("`watch_observers` must be defined and include at least one configured observer extension"))
}

return err
}

func (re *ReceiverEntry) validate() error {
if err := re.Status.validate(); err != nil {
return err
}
return nil
}

func (s *Status) validate() error {
if s == nil {
return fmt.Errorf("`status` must be defined and contain at least one `metrics` or `statements` mapping")
}

if len(s.Metrics) == 0 && len(s.Statements) == 0 {
return fmt.Errorf("`status` must contain at least one `metrics` or `statements` mapping with at least one of %v", allowedStatusTypes)
}

var err error
statusSources := []struct {
matches map[string][]Match
sourceType string
}{{s.Metrics, "metrics"}, {s.Statements, "statements"}}
for _, statusSource := range statusSources {
for statusType, statements := range statusSource.matches {
if _, ok := allowedStatuses[statusType]; !ok {
err = multierr.Combine(err, fmt.Errorf("unsupported status %q. must be one of %v", statusType, allowedStatusTypes))
continue
}
for _, logMatch := range statements {
var matchTypes []string
if logMatch.Strict != "" {
matchTypes = append(matchTypes, "strict")
}
if logMatch.Regexp != "" {
matchTypes = append(matchTypes, "regexp")
}
if logMatch.Expr != "" {
matchTypes = append(matchTypes, "expr")
}
if len(matchTypes) != 1 {
err = multierr.Combine(err, fmt.Errorf(
"`%s` status source type `%s` match type validation failed. Must provide one of %v but received %v", statusSource.sourceType, statusType, allowedMatchTypes, matchTypes,
))
}
if e := logMatch.Record.validate(); e != nil {
err = multierr.Combine(err, fmt.Errorf(" %q log record validation failure: %w", statusType, e))
}
}
}
}
return err
}

func (lr *LogRecord) validate() error {
// TODO: supported severity text validation
return nil
}
143 changes: 143 additions & 0 deletions internal/receiver/discoveryreceiver/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// Copyright Splunk, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package discoveryreceiver

import (
"fmt"
"path"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/service/servicetest"
)

func TestValidConfig(t *testing.T) {
factories, err := componenttest.NopFactories()
assert.Nil(t, err)

factory := NewFactory()
factories.Receivers[typeStr] = factory
collectorConfig, err := servicetest.LoadConfig(
path.Join(".", "testdata", "config.yaml"), factories,
)

require.NoError(t, err)
require.NotNil(t, collectorConfig)

assert.Equal(t, len(collectorConfig.Receivers), 1)

cfg := collectorConfig.Receivers[config.NewComponentID(typeStr)].(*Config)
require.Equal(t, &Config{
Receivers: map[config.ComponentID]ReceiverEntry{
config.NewComponentIDWithName("smartagent", "redis"): {
Config: map[string]any{
"auth": "password",
"host": "`host`",
"port": "`port`",
"type": "collectd/redis",
},
Status: &Status{
Metrics: map[string][]Match{
"successful": {
Match{
Record: &LogRecord{
Attributes: map[string]string{
"attr_one": "attr_one_val",
"attr_two": "attr_two_val",
},
Severity: "info",
Body: "smartagent/redis receiver successful status",
},
Strict: "",
Regexp: ".*",
Expr: "",
FirstOnly: true,
},
},
},
Statements: map[string][]Match{
"failed": {
{
Strict: "",
Regexp: "ConnectionRefusedError",
Expr: "",
FirstOnly: true,
Record: &LogRecord{
Attributes: map[string]string{},
Severity: "info",
Body: "container appears to not be accepting redis connections",
},
},
},
"partial": {
{
Strict: "",
Regexp: "(WRONGPASS|NOAUTH|ERR AUTH)",
Expr: "",
FirstOnly: false,
Record: &LogRecord{
Attributes: nil,
Severity: "warn",
Body: "desired log invalid auth log body",
},
},
},
},
},
ResourceAttributes: map[string]string{
"receiver_attribute": "receiver_attribute_value",
},
Rule: "type == \"container\"",
},
},
ReceiverSettings: config.NewReceiverSettings(config.NewComponentID("discovery")),
LogEndpoints: true,
EmbedReceiverConfig: true,
WatchObservers: []config.ComponentID{
config.NewComponentID("an_observer"),
config.NewComponentIDWithName("another_observer", "with_name"),
},
},
cfg)
require.NoError(t, cfg.Validate())
}

func TestInvalidConfigs(t *testing.T) {
factories, err := componenttest.NopFactories()
assert.Nil(t, err)
factory := NewFactory()
factories.Receivers[typeStr] = factory

tests := []struct{ name, expectedError string }{
{name: "no_watch_observers", expectedError: "receiver \"discovery\" has invalid configuration: `watch_observers` must be defined and include at least one configured observer extension"},
{name: "missing_status", expectedError: "receiver \"discovery\" has invalid configuration: receiver \"a_receiver\" validation failure: `status` must be defined and contain at least one `metrics` or `statements` mapping"},
{name: "missing_status_metrics_and_statements", expectedError: "receiver \"discovery\" has invalid configuration: receiver \"a_receiver\" validation failure: `status` must be defined and contain at least one `metrics` or `statements` mapping"},
{name: "invalid_status_types", expectedError: `receiver "discovery" has invalid configuration: receiver "a_receiver" validation failure: unsupported status "unsupported". must be one of [successful partial failed]; unsupported status "another_unsupported". must be one of [successful partial failed]`},
{name: "multiple_status_match_types", expectedError: "receiver \"discovery\" has invalid configuration: receiver \"a_receiver\" validation failure: `metrics` status source type `successful` match type validation failed. Must provide one of [regexp strict expr] but received [strict regexp]; `statements` status source type `failed` match type validation failed. Must provide one of [regexp strict expr] but received [strict expr]"},
}

for _, test := range tests {
func(name, expectedError string) {
t.Run(name, func(t *testing.T) {
_, err = servicetest.LoadConfigAndValidate(path.Join(".", "testdata", fmt.Sprintf("%s.yaml", name)), factories)
require.Error(t, err)
require.EqualError(t, err, expectedError)
})
}(test.name, test.expectedError)
}
}
55 changes: 55 additions & 0 deletions internal/receiver/discoveryreceiver/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright Splunk, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package discoveryreceiver

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
)

const (
typeStr = "discovery"
)

func NewFactory() component.ReceiverFactory {
return component.NewReceiverFactory(
typeStr,
createDefaultConfig,
component.WithLogsReceiver(createLogsReceiver, component.StabilityLevelInDevelopment))
}

func createDefaultConfig() config.Receiver {
return &Config{
ReceiverSettings: config.NewReceiverSettings(config.NewComponentID(typeStr)),
LogEndpoints: false,
EmbedReceiverConfig: false,
}
}

func createLogsReceiver(
_ context.Context,
settings component.ReceiverCreateSettings,
cfg config.Receiver,
consumer consumer.Logs,
) (component.LogsReceiver, error) {
dCfg := cfg.(*Config)
if err := dCfg.Validate(); err != nil {
return nil, err
}
return newDiscoveryReceiver(settings, dCfg, consumer)
}
Loading

0 comments on commit fd7f2c8

Please sign in to comment.