Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore] Create a single interface for the Receiver and Exporter parsers #2287

Merged
merged 20 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
85f47cb
Create a single interface for the Receiver and Exporter parsers
iblancasa Oct 27, 2023
dab0145
Fix lint
iblancasa Oct 27, 2023
d2c8f97
Use ComponentType
iblancasa Oct 30, 2023
f72dc0e
Merge branch 'main' into task/2008
iblancasa Nov 2, 2023
8369cd6
Merge branch 'task/2008' of github.com:iblancasa/opentelemetry-operat…
iblancasa Nov 2, 2023
e10a1d0
Merge branch 'main' into task/2008
iblancasa Nov 6, 2023
444f112
Merge branch 'main' into task/2008
iblancasa Nov 6, 2023
282504a
Merge branch 'task/2008' of github.com:iblancasa/opentelemetry-operat…
iblancasa Nov 6, 2023
8862610
Apply changes requested in code review
iblancasa Nov 6, 2023
333d5f5
Merge branch 'main' into task/2008
iblancasa Nov 13, 2023
b12d8da
Merge branch 'main' into task/2008
iblancasa Nov 14, 2023
4991ccb
Merge branch 'main' of github.com:open-telemetry/opentelemetry-operat…
iblancasa Nov 20, 2023
58d99df
Merge branch 'task/2008' of github.com:iblancasa/opentelemetry-operat…
iblancasa Nov 20, 2023
d5d53e8
Simplify getting the enabled components
iblancasa Nov 20, 2023
60a0409
Merge branch 'main' of github.com:open-telemetry/opentelemetry-operat…
iblancasa Nov 20, 2023
51cc28b
Merge branch 'main' of github.com:open-telemetry/opentelemetry-operat…
iblancasa Nov 20, 2023
3ee5b15
Fix lint
iblancasa Nov 20, 2023
a21a207
Merge branch 'main' into task/2008
iblancasa Nov 23, 2023
7d835fe
Merge branch 'main' into task/2008
iblancasa Nov 27, 2023
2a52309
Merge branch 'main' into task/2008
iblancasa Nov 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 45 additions & 106 deletions internal/manifests/collector/adapters/config_to_ports.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
package adapters

import (
"errors"
"fmt"
"net"
"sort"
"strconv"
Expand All @@ -24,26 +24,13 @@ import (
"github.com/mitchellh/mapstructure"
corev1 "k8s.io/api/core/v1"

"github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector/parser"
exporterParser "github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector/parser/exporter"
receiverParser "github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector/parser/receiver"
)

var (
// ErrNoExporters indicates that there are no exporters in the configuration.
ErrNoExporters = errors.New("no exporters available as part of the configuration")

// ErrNoReceivers indicates that there are no receivers in the configuration.
ErrNoReceivers = errors.New("no receivers available as part of the configuration")

// ErrReceiversNotAMap indicates that the receivers property isn't a map of values.
ErrReceiversNotAMap = errors.New("receivers property in the configuration doesn't contain valid receivers")

// ErrExportersNotAMap indicates that the exporters property isn't a map of values.
ErrExportersNotAMap = errors.New("exporters property in the configuration doesn't contain valid exporters")
)

// ConfigToExporterPorts converts the incoming configuration object into a set of service ports required by the exporters.
func ConfigToExporterPorts(logger logr.Logger, config map[interface{}]interface{}) ([]corev1.ServicePort, error) {
// ConfigToComponentPorts converts the incoming configuration object into a set of service ports required by the exporters.
func ConfigToComponentPorts(logger logr.Logger, componentsName string, config map[interface{}]interface{}) ([]corev1.ServicePort, error) {
// now, we gather which ports we might need to open
// for that, we get all the exporters and check their `endpoint` properties,
// extracting the port from it. The port name has to be a "DNS_LABEL", so, we try to make it follow the pattern:
Expand All @@ -52,119 +39,71 @@ func ConfigToExporterPorts(logger logr.Logger, config map[interface{}]interface{
// the exporter-qualifier is what comes after the slash in the exporter name, but typically nil
// examples:
// ```yaml
// exporters:
// exampleexporter:
// components:
// componentexample:
// endpoint: 0.0.0.0:12345
// exampleexporter/settings:
// componentexample/settings:
// endpoint: 0.0.0.0:12346
// in this case, we have two ports, named: "exampleexporter" and "exampleexporter-settings"
exportersProperty, ok := config["exporters"]
// in this case, we have 2 ports, named: "componentexample" and "componentexample-settings"
componentsProperty, ok := config[componentsName]
if !ok {
return nil, ErrNoExporters
}
expEnabled := GetEnabledExporters(logger, config)
if expEnabled == nil {
return nil, ErrExportersNotAMap
return nil, fmt.Errorf("no %s available as part of the configuration", componentsName)
}
exporters, ok := exportersProperty.(map[interface{}]interface{})

components, ok := componentsProperty.(map[interface{}]interface{})
if !ok {
return nil, ErrExportersNotAMap
return nil, fmt.Errorf("%s doesn't contain valid components", componentsName)
}

var compEnabled map[interface{}]bool

switch componentsName {
case "exporters":
compEnabled = GetEnabledExporters(logger, config)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than switching on type here could we just call getEnabledComponents(cType, config)?

case "receivers":
compEnabled = GetEnabledReceivers(logger, config)
}

if compEnabled == nil {
return nil, fmt.Errorf("no enabled %s available as part of the configuration", componentsName)
}

ports := []corev1.ServicePort{}
for key, val := range exporters {
// This check will pass only the enabled exporters,
for key, val := range components {
// This check will pass only the enabled components,
// then only the related ports will be opened.
if !expEnabled[key] {
if !compEnabled[key] {
continue
}
exporter, ok := val.(map[interface{}]interface{})
if !ok {
logger.V(2).Info("exporter doesn't seem to be a map of properties", "exporter", key)
logger.V(2).Info("component doesn't seem to be a map of properties", componentsName[:len(componentsName)-2], key)
exporter = map[interface{}]interface{}{}
}

exprtName := key.(string)
exprtParser, err := exporterParser.For(logger, exprtName, exporter)
if err != nil {
logger.V(2).Info("no parser found for '%s'", exprtName)
continue
cmptName := key.(string)
var cmptParser parser.ComponentPortParser
var err error
switch componentsName {
case "exporters":
cmptParser, err = exporterParser.For(logger, cmptName, exporter)
case "receivers":
cmptParser, err = receiverParser.For(logger, cmptName, exporter)
}

exprtPorts, err := exprtParser.Ports()
if err != nil {
logger.Error(err, "parser for '%s' has returned an error: %w", exprtName, err)
continue
}

if len(exprtPorts) > 0 {
ports = append(ports, exprtPorts...)
}
}

sort.Slice(ports, func(i, j int) bool {
return ports[i].Name < ports[j].Name
})

return ports, nil
}

// ConfigToReceiverPorts converts the incoming configuration object into a set of service ports required by the receivers.
func ConfigToReceiverPorts(logger logr.Logger, config map[interface{}]interface{}) ([]corev1.ServicePort, error) {
// now, we gather which ports we might need to open
// for that, we get all the receivers and check their `endpoint` properties,
// extracting the port from it. The port name has to be a "DNS_LABEL", so, we try to make it follow the pattern:
// ${instance.Name}-${receiver.name}-${receiver.qualifier}
// the receiver-name is typically the node name from the receivers map
// the receiver-qualifier is what comes after the slash in the receiver name, but typically nil
// examples:
// ```yaml
// receivers:
// examplereceiver:
// endpoint: 0.0.0.0:12345
// examplereceiver/settings:
// endpoint: 0.0.0.0:12346
// in this case, we have two ports, named: "examplereceiver" and "examplereceiver-settings"
receiversProperty, ok := config["receivers"]
if !ok {
return nil, ErrNoReceivers
}
recEnabled := GetEnabledReceivers(logger, config)
if recEnabled == nil {
return nil, ErrReceiversNotAMap
}
receivers, ok := receiversProperty.(map[interface{}]interface{})
if !ok {
return nil, ErrReceiversNotAMap
}

ports := []corev1.ServicePort{}
for key, val := range receivers {
// This check will pass only the enabled receivers,
// then only the related ports will be opened.
if !recEnabled[key] {
logger.V(2).Info("no parser found for '%s'", cmptName)
continue
}
receiver, ok := val.(map[interface{}]interface{})
if !ok {
logger.Info("receiver doesn't seem to be a map of properties", "receiver", key)
receiver = map[interface{}]interface{}{}
}

rcvrName := key.(string)
rcvrParser := receiverParser.For(logger, rcvrName, receiver)

rcvrPorts, err := rcvrParser.Ports()
exprtPorts, err := cmptParser.Ports()
if err != nil {
// should we break the process and return an error, or just ignore this faulty parser
// and let the other parsers add their ports to the service? right now, the best
// option seems to be to log the failures and move on, instead of failing them all
logger.Error(err, "parser for '%s' has returned an error: %w", rcvrName, err)
logger.Error(err, "parser for '%s' has returned an error: %w", cmptName, err)
continue
}

if len(rcvrPorts) > 0 {
ports = append(ports, rcvrPorts...)
if len(exprtPorts) > 0 {
ports = append(ports, exprtPorts...)
}
}

Expand All @@ -176,12 +115,12 @@ func ConfigToReceiverPorts(logger logr.Logger, config map[interface{}]interface{
}

func ConfigToPorts(logger logr.Logger, config map[interface{}]interface{}) []corev1.ServicePort {
ports, err := ConfigToReceiverPorts(logger, config)
ports, err := ConfigToComponentPorts(logger, "receivers", config)
if err != nil {
logger.Error(err, "there was a problem while getting the ports from the receivers")
}

exporterPorts, err := ConfigToExporterPorts(logger, config)
exporterPorts, err := ConfigToComponentPorts(logger, "exporters", config)
if err != nil {
logger.Error(err, "there was a problem while getting the ports from the exporters")
}
Expand Down
15 changes: 8 additions & 7 deletions internal/manifests/collector/adapters/config_to_ports_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
logf "sigs.k8s.io/controller-runtime/pkg/log"

"github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector/adapters"
"github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector/parser"
"github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector/parser/receiver"
)

Expand Down Expand Up @@ -83,7 +84,7 @@ func TestExtractPortsFromConfig(t *testing.T) {
require.NotEmpty(t, config)

// test
ports, err := adapters.ConfigToReceiverPorts(logger, config)
ports, err := adapters.ConfigToComponentPorts(logger, "receivers", config)
assert.NoError(t, err)
assert.Len(t, ports, 10)

Expand Down Expand Up @@ -116,12 +117,12 @@ func TestNoPortsParsed(t *testing.T) {
configStr string
}{
{
expected: adapters.ErrNoReceivers,
expected: errors.New("no receivers available as part of the configuration"),
desc: "empty",
configStr: "",
},
{
expected: adapters.ErrReceiversNotAMap,
expected: errors.New("receivers doesn't contain valid components"),
desc: "not a map",
configStr: "receivers: some-string",
},
Expand All @@ -132,7 +133,7 @@ func TestNoPortsParsed(t *testing.T) {
require.NoError(t, err)

// test
ports, err := adapters.ConfigToReceiverPorts(logger, config)
ports, err := adapters.ConfigToComponentPorts(logger, "receivers", config)

// verify
assert.Nil(t, ports)
Expand Down Expand Up @@ -161,7 +162,7 @@ func TestInvalidReceivers(t *testing.T) {
require.NoError(t, err)

// test
ports, err := adapters.ConfigToReceiverPorts(logger, config)
ports, err := adapters.ConfigToComponentPorts(logger, "receivers", config)

// verify
assert.NoError(t, err)
Expand All @@ -179,7 +180,7 @@ func TestParserFailed(t *testing.T) {
return nil, errors.New("mocked error")
},
}
receiver.Register("mock", func(logger logr.Logger, name string, config map[interface{}]interface{}) receiver.ReceiverParser {
receiver.Register("mock", func(logger logr.Logger, name string, config map[interface{}]interface{}) parser.ComponentPortParser {
return mockParser
})

Expand All @@ -197,7 +198,7 @@ func TestParserFailed(t *testing.T) {
}

// test
ports, err := adapters.ConfigToReceiverPorts(logger, config)
ports, err := adapters.ConfigToComponentPorts(logger, "receivers", config)

// verify
assert.Len(t, ports, 0)
Expand Down
2 changes: 1 addition & 1 deletion internal/manifests/collector/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func servicePortsFromCfg(logger logr.Logger, otelcol v1alpha1.OpenTelemetryColle
return nil
}

ports, err := adapters.ConfigToReceiverPorts(logger, configFromString)
ports, err := adapters.ConfigToComponentPorts(logger, "receivers", configFromString)
if err != nil {
logger.Error(err, "couldn't build the ingress for this instance")
}
Expand Down
23 changes: 6 additions & 17 deletions internal/manifests/collector/parser/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,31 +25,20 @@ import (
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"

"github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector/parser"
"github.com/open-telemetry/opentelemetry-operator/internal/naming"
)

// ExporterParser is an interface that should be implemented by all exporter parsers.
type ExporterParser interface {
// Ports returns the service ports parsed based on the exporter's configuration
Ports() ([]corev1.ServicePort, error)

// ParserName returns the name of this parser
ParserName() string
}

// Builder specifies the signature required for parser builders.
type Builder func(logr.Logger, string, map[interface{}]interface{}) ExporterParser

// registry holds a record of all known parsers.
var registry = make(map[string]Builder)
// registry holds a record of all known exporter parsers.
var registry = make(map[string]parser.Builder)

// BuilderFor returns a parser builder for the given exporter name.
func BuilderFor(name string) Builder {
func BuilderFor(name string) parser.Builder {
return registry[exporterType(name)]
}

// For returns a new parser for the given exporter name + config.
func For(logger logr.Logger, name string, config map[interface{}]interface{}) (ExporterParser, error) {
func For(logger logr.Logger, name string, config map[interface{}]interface{}) (parser.ComponentPortParser, error) {
builder := BuilderFor(name)
if builder == nil {
return nil, fmt.Errorf("no builders for %s", name)
Expand All @@ -58,7 +47,7 @@ func For(logger logr.Logger, name string, config map[interface{}]interface{}) (E
}

// Register adds a new parser builder to the list of known builders.
func Register(name string, builder Builder) {
func Register(name string, builder parser.Builder) {
registry[name] = builder
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"

"github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector/parser"
"github.com/open-telemetry/opentelemetry-operator/internal/naming"
)

var _ ExporterParser = &PrometheusExporterParser{}
var _ parser.ComponentPortParser = &PrometheusExporterParser{}

const (
parserNamePrometheus = "__prometheus"
Expand All @@ -37,7 +38,7 @@ type PrometheusExporterParser struct {
}

// NewPrometheusExporterParser builds a new parser for OTLP receivers.
func NewPrometheusExporterParser(logger logr.Logger, name string, config map[interface{}]interface{}) ExporterParser {
func NewPrometheusExporterParser(logger logr.Logger, name string, config map[interface{}]interface{}) parser.ComponentPortParser {
return &PrometheusExporterParser{
logger: logger,
name: name,
Expand Down
31 changes: 31 additions & 0 deletions internal/manifests/collector/parser/parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package parser

import (
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
)

type ComponentPortParser interface {
// Ports returns the service ports parsed based on the exporter's configuration
Ports() ([]corev1.ServicePort, error)

// ParserName returns the name of this parser
ParserName() string
}

// Builder specifies the signature required for parser builders.
type Builder func(logr.Logger, string, map[interface{}]interface{}) ComponentPortParser
Loading
Loading