Skip to content

Commit

Permalink
[chore] Create a single interface for the Receiver and Exporter parse…
Browse files Browse the repository at this point in the history
…rs (#2287)

* Create a single interface for the Receiver and Exporter parsers

Signed-off-by: Israel Blancas <[email protected]>

* Fix lint

Signed-off-by: Israel Blancas <[email protected]>

* Use ComponentType

Signed-off-by: Israel Blancas <[email protected]>

* Apply changes requested in code review

Signed-off-by: Israel Blancas <[email protected]>

* Simplify getting the enabled components

Signed-off-by: Israel Blancas <[email protected]>

* Fix lint

Signed-off-by: Israel Blancas <[email protected]>

---------

Signed-off-by: Israel Blancas <[email protected]>
  • Loading branch information
iblancasa authored Nov 27, 2023
1 parent ee0790f commit 7c11add
Show file tree
Hide file tree
Showing 38 changed files with 220 additions and 209 deletions.
2 changes: 2 additions & 0 deletions apis/v1alpha1/collector_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ func TestOTELColDefaultingWebhook(t *testing.T) {
}

for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
cvw := &CollectorWebhook{
logger: logr.Discard(),
Expand Down Expand Up @@ -786,6 +787,7 @@ func TestOTELColValidatingWebhook(t *testing.T) {
}

for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
cvw := &CollectorWebhook{
logger: logr.Discard(),
Expand Down
1 change: 1 addition & 0 deletions apis/v1alpha1/instrumentation_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ func TestInstrumentationValidatingWebhook(t *testing.T) {
}

for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
ctx := context.Background()
if test.err == "" {
Expand Down
2 changes: 2 additions & 0 deletions apis/v1alpha1/opampbridge_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func TestOpAMPBridgeDefaultingWebhook(t *testing.T) {
}

for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
webhook := &OpAMPBridgeWebhook{
logger: logr.Discard(),
Expand Down Expand Up @@ -294,6 +295,7 @@ func TestOpAMPBridgeValidatingWebhook(t *testing.T) {
}

for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
webhook := &OpAMPBridgeWebhook{
logger: logr.Discard(),
Expand Down
4 changes: 4 additions & 0 deletions controllers/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,7 @@ func TestOpenTelemetryCollectorReconciler_Reconcile(t *testing.T) {
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
testContext := context.Background()
nsn := types.NamespacedName{Name: tt.args.params.OtelCol.Name, Namespace: tt.args.params.OtelCol.Namespace}
Expand Down Expand Up @@ -557,6 +558,7 @@ func TestOpenTelemetryCollectorReconciler_Reconcile(t *testing.T) {
}
// run the next set of checks
for pid, updateParam := range tt.args.updates {
updateParam := updateParam
existing := v1alpha1.OpenTelemetryCollector{}
found, err := populateObjectIfExists(t, &existing, nsn)
assert.True(t, found)
Expand Down Expand Up @@ -676,6 +678,7 @@ func TestOpAMPBridgeReconciler_Reconcile(t *testing.T) {
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
testContext := context.Background()
nsn := types.NamespacedName{Name: tt.args.params.OpAMPBridge.Name, Namespace: tt.args.params.OpAMPBridge.Namespace}
Expand Down Expand Up @@ -710,6 +713,7 @@ func TestOpAMPBridgeReconciler_Reconcile(t *testing.T) {
}
// run the next set of checks
for pid, updateParam := range tt.args.updates {
updateParam := updateParam
existing := v1alpha1.OpAMPBridge{}
found, err := populateObjectIfExists(t, &existing, nsn)
assert.True(t, found)
Expand Down
149 changes: 46 additions & 103 deletions internal/manifests/collector/adapters/config_to_ports.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
package adapters

import (
"errors"
"fmt"
"net"
"sort"
"strconv"
Expand All @@ -24,26 +24,24 @@ import (
"github.com/mitchellh/mapstructure"
corev1 "k8s.io/api/core/v1"

"github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector/parser"
exporterParser "github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector/parser/exporter"
receiverParser "github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector/parser/receiver"
)

var (
// ErrNoExporters indicates that there are no exporters in the configuration.
ErrNoExporters = errors.New("no exporters available as part of the configuration")
type ComponentType int

// ErrNoReceivers indicates that there are no receivers in the configuration.
ErrNoReceivers = errors.New("no receivers available as part of the configuration")

// ErrReceiversNotAMap indicates that the receivers property isn't a map of values.
ErrReceiversNotAMap = errors.New("receivers property in the configuration doesn't contain valid receivers")

// ErrExportersNotAMap indicates that the exporters property isn't a map of values.
ErrExportersNotAMap = errors.New("exporters property in the configuration doesn't contain valid exporters")
const (
ComponentTypeReceiver ComponentType = iota
ComponentTypeExporter
)

// ConfigToExporterPorts converts the incoming configuration object into a set of service ports required by the exporters.
func ConfigToExporterPorts(logger logr.Logger, config map[interface{}]interface{}) ([]corev1.ServicePort, error) {
func (c ComponentType) String() string {
return [...]string{"receiver", "exporter"}[c]
}

// ConfigToComponentPorts converts the incoming configuration object into a set of service ports required by the exporters.
func ConfigToComponentPorts(logger logr.Logger, cType ComponentType, config map[interface{}]interface{}) ([]corev1.ServicePort, error) {
// now, we gather which ports we might need to open
// for that, we get all the exporters and check their `endpoint` properties,
// extracting the port from it. The port name has to be a "DNS_LABEL", so, we try to make it follow the pattern:
Expand All @@ -52,119 +50,64 @@ func ConfigToExporterPorts(logger logr.Logger, config map[interface{}]interface{
// the exporter-qualifier is what comes after the slash in the exporter name, but typically nil
// examples:
// ```yaml
// exporters:
// exampleexporter:
// components:
// componentexample:
// endpoint: 0.0.0.0:12345
// exampleexporter/settings:
// componentexample/settings:
// endpoint: 0.0.0.0:12346
// in this case, we have two ports, named: "exampleexporter" and "exampleexporter-settings"
exportersProperty, ok := config["exporters"]
// in this case, we have 2 ports, named: "componentexample" and "componentexample-settings"
componentsProperty, ok := config[fmt.Sprintf("%ss", cType.String())]
if !ok {
return nil, ErrNoExporters
}
expEnabled := GetEnabledExporters(logger, config)
if expEnabled == nil {
return nil, ErrExportersNotAMap
return nil, fmt.Errorf("no %ss available as part of the configuration", cType)
}
exporters, ok := exportersProperty.(map[interface{}]interface{})

components, ok := componentsProperty.(map[interface{}]interface{})
if !ok {
return nil, ErrExportersNotAMap
return nil, fmt.Errorf("%ss doesn't contain valid components", cType.String())
}

compEnabled := getEnabledComponents(config, cType)

if compEnabled == nil {
return nil, fmt.Errorf("no enabled %ss available as part of the configuration", cType)
}

ports := []corev1.ServicePort{}
for key, val := range exporters {
// This check will pass only the enabled exporters,
for key, val := range components {
// This check will pass only the enabled components,
// then only the related ports will be opened.
if !expEnabled[key] {
if !compEnabled[key] {
continue
}
exporter, ok := val.(map[interface{}]interface{})
if !ok {
logger.V(2).Info("exporter doesn't seem to be a map of properties", "exporter", key)
logger.V(2).Info("component doesn't seem to be a map of properties", cType.String(), key)
exporter = map[interface{}]interface{}{}
}

exprtName := key.(string)
exprtParser, err := exporterParser.For(logger, exprtName, exporter)
if err != nil {
logger.V(2).Info("no parser found for '%s'", exprtName)
continue
cmptName := key.(string)
var cmptParser parser.ComponentPortParser
var err error
switch cType {
case ComponentTypeExporter:
cmptParser, err = exporterParser.For(logger, cmptName, exporter)
case ComponentTypeReceiver:
cmptParser, err = receiverParser.For(logger, cmptName, exporter)
}

exprtPorts, err := exprtParser.Ports()
if err != nil {
logger.Error(err, "parser for '%s' has returned an error: %w", exprtName, err)
logger.V(2).Info("no parser found for '%s'", cmptName)
continue
}

if len(exprtPorts) > 0 {
ports = append(ports, exprtPorts...)
}
}

sort.Slice(ports, func(i, j int) bool {
return ports[i].Name < ports[j].Name
})

return ports, nil
}

// ConfigToReceiverPorts converts the incoming configuration object into a set of service ports required by the receivers.
func ConfigToReceiverPorts(logger logr.Logger, config map[interface{}]interface{}) ([]corev1.ServicePort, error) {
// now, we gather which ports we might need to open
// for that, we get all the receivers and check their `endpoint` properties,
// extracting the port from it. The port name has to be a "DNS_LABEL", so, we try to make it follow the pattern:
// ${instance.Name}-${receiver.name}-${receiver.qualifier}
// the receiver-name is typically the node name from the receivers map
// the receiver-qualifier is what comes after the slash in the receiver name, but typically nil
// examples:
// ```yaml
// receivers:
// examplereceiver:
// endpoint: 0.0.0.0:12345
// examplereceiver/settings:
// endpoint: 0.0.0.0:12346
// in this case, we have two ports, named: "examplereceiver" and "examplereceiver-settings"
receiversProperty, ok := config["receivers"]
if !ok {
return nil, ErrNoReceivers
}
recEnabled := GetEnabledReceivers(logger, config)
if recEnabled == nil {
return nil, ErrReceiversNotAMap
}
receivers, ok := receiversProperty.(map[interface{}]interface{})
if !ok {
return nil, ErrReceiversNotAMap
}

ports := []corev1.ServicePort{}
for key, val := range receivers {
// This check will pass only the enabled receivers,
// then only the related ports will be opened.
if !recEnabled[key] {
continue
}
receiver, ok := val.(map[interface{}]interface{})
if !ok {
logger.Info("receiver doesn't seem to be a map of properties", "receiver", key)
receiver = map[interface{}]interface{}{}
}

rcvrName := key.(string)
rcvrParser := receiverParser.For(logger, rcvrName, receiver)

rcvrPorts, err := rcvrParser.Ports()
exprtPorts, err := cmptParser.Ports()
if err != nil {
// should we break the process and return an error, or just ignore this faulty parser
// and let the other parsers add their ports to the service? right now, the best
// option seems to be to log the failures and move on, instead of failing them all
logger.Error(err, "parser for '%s' has returned an error: %w", rcvrName, err)
logger.Error(err, "parser for '%s' has returned an error: %w", cmptName, err)
continue
}

if len(rcvrPorts) > 0 {
ports = append(ports, rcvrPorts...)
if len(exprtPorts) > 0 {
ports = append(ports, exprtPorts...)
}
}

Expand All @@ -176,12 +119,12 @@ func ConfigToReceiverPorts(logger logr.Logger, config map[interface{}]interface{
}

func ConfigToPorts(logger logr.Logger, config map[interface{}]interface{}) []corev1.ServicePort {
ports, err := ConfigToReceiverPorts(logger, config)
ports, err := ConfigToComponentPorts(logger, ComponentTypeReceiver, config)
if err != nil {
logger.Error(err, "there was a problem while getting the ports from the receivers")
}

exporterPorts, err := ConfigToExporterPorts(logger, config)
exporterPorts, err := ConfigToComponentPorts(logger, ComponentTypeExporter, config)
if err != nil {
logger.Error(err, "there was a problem while getting the ports from the exporters")
}
Expand Down
15 changes: 8 additions & 7 deletions internal/manifests/collector/adapters/config_to_ports_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
logf "sigs.k8s.io/controller-runtime/pkg/log"

"github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector/adapters"
"github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector/parser"
"github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector/parser/receiver"
)

Expand Down Expand Up @@ -83,7 +84,7 @@ func TestExtractPortsFromConfig(t *testing.T) {
require.NotEmpty(t, config)

// test
ports, err := adapters.ConfigToReceiverPorts(logger, config)
ports, err := adapters.ConfigToComponentPorts(logger, adapters.ComponentTypeReceiver, config)
assert.NoError(t, err)
assert.Len(t, ports, 10)

Expand Down Expand Up @@ -116,12 +117,12 @@ func TestNoPortsParsed(t *testing.T) {
configStr string
}{
{
expected: adapters.ErrNoReceivers,
expected: errors.New("no receivers available as part of the configuration"),
desc: "empty",
configStr: "",
},
{
expected: adapters.ErrReceiversNotAMap,
expected: errors.New("receivers doesn't contain valid components"),
desc: "not a map",
configStr: "receivers: some-string",
},
Expand All @@ -132,7 +133,7 @@ func TestNoPortsParsed(t *testing.T) {
require.NoError(t, err)

// test
ports, err := adapters.ConfigToReceiverPorts(logger, config)
ports, err := adapters.ConfigToComponentPorts(logger, adapters.ComponentTypeReceiver, config)

// verify
assert.Nil(t, ports)
Expand Down Expand Up @@ -161,7 +162,7 @@ func TestInvalidReceivers(t *testing.T) {
require.NoError(t, err)

// test
ports, err := adapters.ConfigToReceiverPorts(logger, config)
ports, err := adapters.ConfigToComponentPorts(logger, adapters.ComponentTypeReceiver, config)

// verify
assert.NoError(t, err)
Expand All @@ -179,7 +180,7 @@ func TestParserFailed(t *testing.T) {
return nil, errors.New("mocked error")
},
}
receiver.Register("mock", func(logger logr.Logger, name string, config map[interface{}]interface{}) receiver.ReceiverParser {
receiver.Register("mock", func(logger logr.Logger, name string, config map[interface{}]interface{}) parser.ComponentPortParser {
return mockParser
})

Expand All @@ -197,7 +198,7 @@ func TestParserFailed(t *testing.T) {
}

// test
ports, err := adapters.ConfigToReceiverPorts(logger, config)
ports, err := adapters.ConfigToComponentPorts(logger, adapters.ComponentTypeReceiver, config)

// verify
assert.Len(t, ports, 0)
Expand Down
21 changes: 6 additions & 15 deletions internal/manifests/collector/adapters/config_validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,13 @@

package adapters

import (
"github.com/go-logr/logr"
)
import "fmt"

// Following Otel Doc: Configuring a receiver does not enable it. The receivers are enabled via pipelines within the service section.
// GetEnabledReceivers returns all enabled receivers as a true flag set. If it can't find any receiver, it will return a nil interface.
func GetEnabledReceivers(_ logr.Logger, config map[interface{}]interface{}) map[interface{}]bool {
return getEnabledComponents(config, "receivers")
}

func GetEnabledExporters(_ logr.Logger, config map[interface{}]interface{}) map[interface{}]bool {
return getEnabledComponents(config, "exporters")
}

func getEnabledComponents(config map[interface{}]interface{}, componentType string) map[interface{}]bool {
cfgComponents, ok := config[componentType]
// getEnabledComponents returns all enabled components as a true flag set. If it can't find any receiver, it will return a nil interface.
func getEnabledComponents(config map[interface{}]interface{}, componentType ComponentType) map[interface{}]bool {
componentTypePlural := fmt.Sprintf("%ss", componentType)
cfgComponents, ok := config[componentTypePlural]
if !ok {
return nil
}
Expand Down Expand Up @@ -85,7 +76,7 @@ func getEnabledComponents(config map[interface{}]interface{}, componentType stri
return nil
}
for pipSpecID, pipSpecCfg := range pipelineDesc {
if pipSpecID.(string) == componentType {
if pipSpecID.(string) == componentTypePlural {
receiversList, ok := pipSpecCfg.([]interface{})
if !ok {
continue
Expand Down
Loading

0 comments on commit 7c11add

Please sign in to comment.