Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore] Create a single interface for the Receiver and Exporter parsers #2287

Merged
merged 20 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
85f47cb
Create a single interface for the Receiver and Exporter parsers
iblancasa Oct 27, 2023
dab0145
Fix lint
iblancasa Oct 27, 2023
d2c8f97
Use ComponentType
iblancasa Oct 30, 2023
f72dc0e
Merge branch 'main' into task/2008
iblancasa Nov 2, 2023
8369cd6
Merge branch 'task/2008' of github.com:iblancasa/opentelemetry-operat…
iblancasa Nov 2, 2023
e10a1d0
Merge branch 'main' into task/2008
iblancasa Nov 6, 2023
444f112
Merge branch 'main' into task/2008
iblancasa Nov 6, 2023
282504a
Merge branch 'task/2008' of github.com:iblancasa/opentelemetry-operat…
iblancasa Nov 6, 2023
8862610
Apply changes requested in code review
iblancasa Nov 6, 2023
333d5f5
Merge branch 'main' into task/2008
iblancasa Nov 13, 2023
b12d8da
Merge branch 'main' into task/2008
iblancasa Nov 14, 2023
4991ccb
Merge branch 'main' of github.com:open-telemetry/opentelemetry-operat…
iblancasa Nov 20, 2023
58d99df
Merge branch 'task/2008' of github.com:iblancasa/opentelemetry-operat…
iblancasa Nov 20, 2023
d5d53e8
Simplify getting the enabled components
iblancasa Nov 20, 2023
60a0409
Merge branch 'main' of github.com:open-telemetry/opentelemetry-operat…
iblancasa Nov 20, 2023
51cc28b
Merge branch 'main' of github.com:open-telemetry/opentelemetry-operat…
iblancasa Nov 20, 2023
3ee5b15
Fix lint
iblancasa Nov 20, 2023
a21a207
Merge branch 'main' into task/2008
iblancasa Nov 23, 2023
7d835fe
Merge branch 'main' into task/2008
iblancasa Nov 27, 2023
2a52309
Merge branch 'main' into task/2008
iblancasa Nov 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions apis/v1alpha1/collector_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ func TestOTELColDefaultingWebhook(t *testing.T) {
}

for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
cvw := &CollectorWebhook{
logger: logr.Discard(),
Expand Down Expand Up @@ -786,6 +787,7 @@ func TestOTELColValidatingWebhook(t *testing.T) {
}

for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
cvw := &CollectorWebhook{
logger: logr.Discard(),
Expand Down
1 change: 1 addition & 0 deletions apis/v1alpha1/instrumentation_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ func TestInstrumentationValidatingWebhook(t *testing.T) {
}

for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
ctx := context.Background()
if test.err == "" {
Expand Down
2 changes: 2 additions & 0 deletions apis/v1alpha1/opampbridge_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func TestOpAMPBridgeDefaultingWebhook(t *testing.T) {
}

for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
webhook := &OpAMPBridgeWebhook{
logger: logr.Discard(),
Expand Down Expand Up @@ -294,6 +295,7 @@ func TestOpAMPBridgeValidatingWebhook(t *testing.T) {
}

for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
webhook := &OpAMPBridgeWebhook{
logger: logr.Discard(),
Expand Down
4 changes: 4 additions & 0 deletions controllers/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,7 @@ func TestOpenTelemetryCollectorReconciler_Reconcile(t *testing.T) {
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
testContext := context.Background()
nsn := types.NamespacedName{Name: tt.args.params.OtelCol.Name, Namespace: tt.args.params.OtelCol.Namespace}
Expand Down Expand Up @@ -557,6 +558,7 @@ func TestOpenTelemetryCollectorReconciler_Reconcile(t *testing.T) {
}
// run the next set of checks
for pid, updateParam := range tt.args.updates {
updateParam := updateParam
existing := v1alpha1.OpenTelemetryCollector{}
found, err := populateObjectIfExists(t, &existing, nsn)
assert.True(t, found)
Expand Down Expand Up @@ -676,6 +678,7 @@ func TestOpAMPBridgeReconciler_Reconcile(t *testing.T) {
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
testContext := context.Background()
nsn := types.NamespacedName{Name: tt.args.params.OpAMPBridge.Name, Namespace: tt.args.params.OpAMPBridge.Namespace}
Expand Down Expand Up @@ -710,6 +713,7 @@ func TestOpAMPBridgeReconciler_Reconcile(t *testing.T) {
}
// run the next set of checks
for pid, updateParam := range tt.args.updates {
updateParam := updateParam
existing := v1alpha1.OpAMPBridge{}
found, err := populateObjectIfExists(t, &existing, nsn)
assert.True(t, found)
Expand Down
149 changes: 46 additions & 103 deletions internal/manifests/collector/adapters/config_to_ports.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
package adapters

import (
"errors"
"fmt"
"net"
"sort"
"strconv"
Expand All @@ -24,26 +24,24 @@ import (
"github.com/mitchellh/mapstructure"
corev1 "k8s.io/api/core/v1"

"github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector/parser"
exporterParser "github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector/parser/exporter"
receiverParser "github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector/parser/receiver"
)

var (
// ErrNoExporters indicates that there are no exporters in the configuration.
ErrNoExporters = errors.New("no exporters available as part of the configuration")
type ComponentType int

// ErrNoReceivers indicates that there are no receivers in the configuration.
ErrNoReceivers = errors.New("no receivers available as part of the configuration")

// ErrReceiversNotAMap indicates that the receivers property isn't a map of values.
ErrReceiversNotAMap = errors.New("receivers property in the configuration doesn't contain valid receivers")

// ErrExportersNotAMap indicates that the exporters property isn't a map of values.
ErrExportersNotAMap = errors.New("exporters property in the configuration doesn't contain valid exporters")
const (
ComponentTypeReceiver ComponentType = iota
ComponentTypeExporter
)

// ConfigToExporterPorts converts the incoming configuration object into a set of service ports required by the exporters.
func ConfigToExporterPorts(logger logr.Logger, config map[interface{}]interface{}) ([]corev1.ServicePort, error) {
func (c ComponentType) String() string {
return [...]string{"receiver", "exporter"}[c]
}

// ConfigToComponentPorts converts the incoming configuration object into a set of service ports required by the exporters.
func ConfigToComponentPorts(logger logr.Logger, cType ComponentType, config map[interface{}]interface{}) ([]corev1.ServicePort, error) {
// now, we gather which ports we might need to open
// for that, we get all the exporters and check their `endpoint` properties,
// extracting the port from it. The port name has to be a "DNS_LABEL", so, we try to make it follow the pattern:
Expand All @@ -52,119 +50,64 @@ func ConfigToExporterPorts(logger logr.Logger, config map[interface{}]interface{
// the exporter-qualifier is what comes after the slash in the exporter name, but typically nil
// examples:
// ```yaml
// exporters:
// exampleexporter:
// components:
// componentexample:
// endpoint: 0.0.0.0:12345
// exampleexporter/settings:
// componentexample/settings:
// endpoint: 0.0.0.0:12346
// in this case, we have two ports, named: "exampleexporter" and "exampleexporter-settings"
exportersProperty, ok := config["exporters"]
// in this case, we have 2 ports, named: "componentexample" and "componentexample-settings"
componentsProperty, ok := config[fmt.Sprintf("%ss", cType.String())]
if !ok {
return nil, ErrNoExporters
}
expEnabled := GetEnabledExporters(logger, config)
if expEnabled == nil {
return nil, ErrExportersNotAMap
return nil, fmt.Errorf("no %ss available as part of the configuration", cType)
}
exporters, ok := exportersProperty.(map[interface{}]interface{})

components, ok := componentsProperty.(map[interface{}]interface{})
if !ok {
return nil, ErrExportersNotAMap
return nil, fmt.Errorf("%ss doesn't contain valid components", cType.String())
}

compEnabled := getEnabledComponents(config, cType)

if compEnabled == nil {
return nil, fmt.Errorf("no enabled %ss available as part of the configuration", cType)
}

ports := []corev1.ServicePort{}
for key, val := range exporters {
// This check will pass only the enabled exporters,
for key, val := range components {
// This check will pass only the enabled components,
// then only the related ports will be opened.
if !expEnabled[key] {
if !compEnabled[key] {
continue
}
exporter, ok := val.(map[interface{}]interface{})
if !ok {
logger.V(2).Info("exporter doesn't seem to be a map of properties", "exporter", key)
logger.V(2).Info("component doesn't seem to be a map of properties", cType.String(), key)
exporter = map[interface{}]interface{}{}
}

exprtName := key.(string)
exprtParser, err := exporterParser.For(logger, exprtName, exporter)
if err != nil {
logger.V(2).Info("no parser found for '%s'", exprtName)
continue
cmptName := key.(string)
var cmptParser parser.ComponentPortParser
var err error
switch cType {
case ComponentTypeExporter:
cmptParser, err = exporterParser.For(logger, cmptName, exporter)
case ComponentTypeReceiver:
cmptParser, err = receiverParser.For(logger, cmptName, exporter)
iblancasa marked this conversation as resolved.
Show resolved Hide resolved
}

exprtPorts, err := exprtParser.Ports()
if err != nil {
logger.Error(err, "parser for '%s' has returned an error: %w", exprtName, err)
logger.V(2).Info("no parser found for '%s'", cmptName)
continue
}

if len(exprtPorts) > 0 {
ports = append(ports, exprtPorts...)
}
}

sort.Slice(ports, func(i, j int) bool {
return ports[i].Name < ports[j].Name
})

return ports, nil
}

// ConfigToReceiverPorts converts the incoming configuration object into a set of service ports required by the receivers.
func ConfigToReceiverPorts(logger logr.Logger, config map[interface{}]interface{}) ([]corev1.ServicePort, error) {
// now, we gather which ports we might need to open
// for that, we get all the receivers and check their `endpoint` properties,
// extracting the port from it. The port name has to be a "DNS_LABEL", so, we try to make it follow the pattern:
// ${instance.Name}-${receiver.name}-${receiver.qualifier}
// the receiver-name is typically the node name from the receivers map
// the receiver-qualifier is what comes after the slash in the receiver name, but typically nil
// examples:
// ```yaml
// receivers:
// examplereceiver:
// endpoint: 0.0.0.0:12345
// examplereceiver/settings:
// endpoint: 0.0.0.0:12346
// in this case, we have two ports, named: "examplereceiver" and "examplereceiver-settings"
receiversProperty, ok := config["receivers"]
if !ok {
return nil, ErrNoReceivers
}
recEnabled := GetEnabledReceivers(logger, config)
if recEnabled == nil {
return nil, ErrReceiversNotAMap
}
receivers, ok := receiversProperty.(map[interface{}]interface{})
if !ok {
return nil, ErrReceiversNotAMap
}

ports := []corev1.ServicePort{}
for key, val := range receivers {
// This check will pass only the enabled receivers,
// then only the related ports will be opened.
if !recEnabled[key] {
continue
}
receiver, ok := val.(map[interface{}]interface{})
if !ok {
logger.Info("receiver doesn't seem to be a map of properties", "receiver", key)
receiver = map[interface{}]interface{}{}
}

rcvrName := key.(string)
rcvrParser := receiverParser.For(logger, rcvrName, receiver)

rcvrPorts, err := rcvrParser.Ports()
exprtPorts, err := cmptParser.Ports()
if err != nil {
// should we break the process and return an error, or just ignore this faulty parser
// and let the other parsers add their ports to the service? right now, the best
// option seems to be to log the failures and move on, instead of failing them all
logger.Error(err, "parser for '%s' has returned an error: %w", rcvrName, err)
logger.Error(err, "parser for '%s' has returned an error: %w", cmptName, err)
continue
}

if len(rcvrPorts) > 0 {
ports = append(ports, rcvrPorts...)
if len(exprtPorts) > 0 {
ports = append(ports, exprtPorts...)
}
}

Expand All @@ -176,12 +119,12 @@ func ConfigToReceiverPorts(logger logr.Logger, config map[interface{}]interface{
}

func ConfigToPorts(logger logr.Logger, config map[interface{}]interface{}) []corev1.ServicePort {
ports, err := ConfigToReceiverPorts(logger, config)
ports, err := ConfigToComponentPorts(logger, ComponentTypeReceiver, config)
if err != nil {
logger.Error(err, "there was a problem while getting the ports from the receivers")
}

exporterPorts, err := ConfigToExporterPorts(logger, config)
exporterPorts, err := ConfigToComponentPorts(logger, ComponentTypeExporter, config)
if err != nil {
logger.Error(err, "there was a problem while getting the ports from the exporters")
}
Expand Down
15 changes: 8 additions & 7 deletions internal/manifests/collector/adapters/config_to_ports_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
logf "sigs.k8s.io/controller-runtime/pkg/log"

"github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector/adapters"
"github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector/parser"
"github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector/parser/receiver"
)

Expand Down Expand Up @@ -83,7 +84,7 @@ func TestExtractPortsFromConfig(t *testing.T) {
require.NotEmpty(t, config)

// test
ports, err := adapters.ConfigToReceiverPorts(logger, config)
ports, err := adapters.ConfigToComponentPorts(logger, adapters.ComponentTypeReceiver, config)
assert.NoError(t, err)
assert.Len(t, ports, 10)

Expand Down Expand Up @@ -116,12 +117,12 @@ func TestNoPortsParsed(t *testing.T) {
configStr string
}{
{
expected: adapters.ErrNoReceivers,
expected: errors.New("no receivers available as part of the configuration"),
desc: "empty",
configStr: "",
},
{
expected: adapters.ErrReceiversNotAMap,
expected: errors.New("receivers doesn't contain valid components"),
desc: "not a map",
configStr: "receivers: some-string",
},
Expand All @@ -132,7 +133,7 @@ func TestNoPortsParsed(t *testing.T) {
require.NoError(t, err)

// test
ports, err := adapters.ConfigToReceiverPorts(logger, config)
ports, err := adapters.ConfigToComponentPorts(logger, adapters.ComponentTypeReceiver, config)

// verify
assert.Nil(t, ports)
Expand Down Expand Up @@ -161,7 +162,7 @@ func TestInvalidReceivers(t *testing.T) {
require.NoError(t, err)

// test
ports, err := adapters.ConfigToReceiverPorts(logger, config)
ports, err := adapters.ConfigToComponentPorts(logger, adapters.ComponentTypeReceiver, config)

// verify
assert.NoError(t, err)
Expand All @@ -179,7 +180,7 @@ func TestParserFailed(t *testing.T) {
return nil, errors.New("mocked error")
},
}
receiver.Register("mock", func(logger logr.Logger, name string, config map[interface{}]interface{}) receiver.ReceiverParser {
receiver.Register("mock", func(logger logr.Logger, name string, config map[interface{}]interface{}) parser.ComponentPortParser {
return mockParser
})

Expand All @@ -197,7 +198,7 @@ func TestParserFailed(t *testing.T) {
}

// test
ports, err := adapters.ConfigToReceiverPorts(logger, config)
ports, err := adapters.ConfigToComponentPorts(logger, adapters.ComponentTypeReceiver, config)

// verify
assert.Len(t, ports, 0)
Expand Down
21 changes: 6 additions & 15 deletions internal/manifests/collector/adapters/config_validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,13 @@

package adapters

import (
"github.com/go-logr/logr"
)
import "fmt"

// Following Otel Doc: Configuring a receiver does not enable it. The receivers are enabled via pipelines within the service section.
// GetEnabledReceivers returns all enabled receivers as a true flag set. If it can't find any receiver, it will return a nil interface.
func GetEnabledReceivers(_ logr.Logger, config map[interface{}]interface{}) map[interface{}]bool {
return getEnabledComponents(config, "receivers")
}

func GetEnabledExporters(_ logr.Logger, config map[interface{}]interface{}) map[interface{}]bool {
return getEnabledComponents(config, "exporters")
}

func getEnabledComponents(config map[interface{}]interface{}, componentType string) map[interface{}]bool {
cfgComponents, ok := config[componentType]
// getEnabledComponents returns all enabled components as a true flag set. If it can't find any receiver, it will return a nil interface.
func getEnabledComponents(config map[interface{}]interface{}, componentType ComponentType) map[interface{}]bool {
componentTypePlural := fmt.Sprintf("%ss", componentType)
cfgComponents, ok := config[componentTypePlural]
if !ok {
return nil
}
Expand Down Expand Up @@ -85,7 +76,7 @@ func getEnabledComponents(config map[interface{}]interface{}, componentType stri
return nil
}
for pipSpecID, pipSpecCfg := range pipelineDesc {
if pipSpecID.(string) == componentType {
if pipSpecID.(string) == componentTypePlural {
receiversList, ok := pipSpecCfg.([]interface{})
if !ok {
continue
Expand Down
Loading
Loading