From 38ddff74862bf7e6cd90eb7be5e69977532a0bc4 Mon Sep 17 00:00:00 2001 From: Kristina Pathak Date: Wed, 31 Aug 2022 17:38:18 -0700 Subject: [PATCH 1/4] expose container ports --- apis/v1alpha1/opentelemetrycollector_types.go | 7 + ...ntelemetry.io_opentelemetrycollectors.yaml | 43 ++++ ...ntelemetry.io_opentelemetrycollectors.yaml | 43 ++++ docs/api.md | 68 +++++++ pkg/collector/adapters/config_to_ports.go | 66 +++++++ .../adapters/config_to_ports_test.go | 72 +++++-- pkg/collector/container.go | 116 +++++++++++ pkg/collector/container_test.go | 185 ++++++++++++++++++ pkg/collector/parser/receiver.go | 57 ++++++ pkg/collector/parser/receiver_generic.go | 19 ++ pkg/collector/parser/receiver_generic_test.go | 79 ++++++-- pkg/collector/parser/receiver_jaeger.go | 65 ++++++ pkg/collector/parser/receiver_jaeger_test.go | 111 ++++++++--- pkg/collector/parser/receiver_otlp.go | 60 ++++++ pkg/collector/parser/receiver_otlp_test.go | 108 +++++++--- pkg/collector/parser/receiver_test.go | 43 +++- 16 files changed, 1042 insertions(+), 100 deletions(-) diff --git a/apis/v1alpha1/opentelemetrycollector_types.go b/apis/v1alpha1/opentelemetrycollector_types.go index 197162ac47..0f3a982867 100644 --- a/apis/v1alpha1/opentelemetrycollector_types.go +++ b/apis/v1alpha1/opentelemetrycollector_types.go @@ -87,6 +87,13 @@ type OpenTelemetryCollectorSpec struct { // +optional // +listType=atomic Ports []v1.ServicePort `json:"ports,omitempty"` + // ContainerPorts allows a set of ports to be exposed by the underlying v1.Container. By default, the operator + // will attempt to infer the required ports by parsing the .Spec.Config property but this property can be + // used to open additional ports that can't be inferred by the operator, like for custom receivers. Any ports + // specified here will overwrite inferred ports if they have the same name. + // +optional + // +listType=atomic + ContainerPorts []v1.ContainerPort `json:"containerPorts,omitempty"` // ENV vars to set on the OpenTelemetry Collector's Pods. These can then in certain cases be // consumed in the config file for the Collector. // +optional diff --git a/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml b/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml index c1e4aa95ae..9ca8e35940 100644 --- a/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml +++ b/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml @@ -194,6 +194,49 @@ spec: configuration. Refer to the OpenTelemetry Collector documentation for details. type: string + containerPorts: + description: ContainerPorts allows a set of ports to be exposed by + the underlying v1.Container. By default, the operator will attempt + to infer the required ports by parsing the .Spec.Config property + but this property can be used to open additional ports that can't + be inferred by the operator, like for custom receivers. Any ports + specified here will overwrite inferred ports if they have the same + name. + items: + description: ContainerPort represents a network port in a single + container. + properties: + containerPort: + description: Number of port to expose on the pod's IP address. + This must be a valid port number, 0 < x < 65536. + format: int32 + type: integer + hostIP: + description: What host IP to bind the external port to. + type: string + hostPort: + description: Number of port to expose on the host. If specified, + this must be a valid port number, 0 < x < 65536. If HostNetwork + is specified, this must match ContainerPort. Most containers + do not need this. + format: int32 + type: integer + name: + description: If specified, this must be an IANA_SVC_NAME and + unique within the pod. Each named port in a pod must have + a unique name. Name for the port that can be referred to by + services. + type: string + protocol: + default: TCP + description: Protocol for port. Must be UDP, TCP, or SCTP. Defaults + to "TCP". + type: string + required: + - containerPort + type: object + type: array + x-kubernetes-list-type: atomic env: description: ENV vars to set on the OpenTelemetry Collector's Pods. These can then in certain cases be consumed in the config file for diff --git a/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml b/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml index 291ce4177e..8f709b8ece 100644 --- a/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml +++ b/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml @@ -192,6 +192,49 @@ spec: configuration. Refer to the OpenTelemetry Collector documentation for details. type: string + containerPorts: + description: ContainerPorts allows a set of ports to be exposed by + the underlying v1.Container. By default, the operator will attempt + to infer the required ports by parsing the .Spec.Config property + but this property can be used to open additional ports that can't + be inferred by the operator, like for custom receivers. Any ports + specified here will overwrite inferred ports if they have the same + name. + items: + description: ContainerPort represents a network port in a single + container. + properties: + containerPort: + description: Number of port to expose on the pod's IP address. + This must be a valid port number, 0 < x < 65536. + format: int32 + type: integer + hostIP: + description: What host IP to bind the external port to. + type: string + hostPort: + description: Number of port to expose on the host. If specified, + this must be a valid port number, 0 < x < 65536. If HostNetwork + is specified, this must match ContainerPort. Most containers + do not need this. + format: int32 + type: integer + name: + description: If specified, this must be an IANA_SVC_NAME and + unique within the pod. Each named port in a pod must have + a unique name. Name for the port that can be referred to by + services. + type: string + protocol: + default: TCP + description: Protocol for port. Must be UDP, TCP, or SCTP. Defaults + to "TCP". + type: string + required: + - containerPort + type: object + type: array + x-kubernetes-list-type: atomic env: description: ENV vars to set on the OpenTelemetry Collector's Pods. These can then in certain cases be consumed in the config file for diff --git a/docs/api.md b/docs/api.md index cfcf113717..23987bb7ce 100644 --- a/docs/api.md +++ b/docs/api.md @@ -1705,6 +1705,13 @@ OpenTelemetryCollectorSpec defines the desired state of OpenTelemetryCollector. Config is the raw JSON to be used as the collector's configuration. Refer to the OpenTelemetry Collector documentation for details.
false + + containerPorts + []object + + ContainerPorts allows a set of ports to be exposed by the underlying v1.Container. By default, the operator will attempt to infer the required ports by parsing the .Spec.Config property but this property can be used to open additional ports that can't be inferred by the operator, like for custom receivers. Any ports specified here will overwrite inferred ports if they have the same name.
+ + false env []object @@ -2119,6 +2126,67 @@ HPAScalingPolicy is a single policy which must hold true for a specified past in +### OpenTelemetryCollector.spec.containerPorts[index] +[↩ Parent](#opentelemetrycollectorspec) + + + +ContainerPort represents a network port in a single container. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
NameTypeDescriptionRequired
containerPortinteger + Number of port to expose on the pod's IP address. This must be a valid port number, 0 < x < 65536.
+
+ Format: int32
+
true
hostIPstring + What host IP to bind the external port to.
+
false
hostPortinteger + Number of port to expose on the host. If specified, this must be a valid port number, 0 < x < 65536. If HostNetwork is specified, this must match ContainerPort. Most containers do not need this.
+
+ Format: int32
+
false
namestring + If specified, this must be an IANA_SVC_NAME and unique within the pod. Each named port in a pod must have a unique name. Name for the port that can be referred to by services.
+
false
protocolstring + Protocol for port. Must be UDP, TCP, or SCTP. Defaults to "TCP".
+
+ Default: TCP
+
false
+ + ### OpenTelemetryCollector.spec.env[index] [↩ Parent](#opentelemetrycollectorspec) diff --git a/pkg/collector/adapters/config_to_ports.go b/pkg/collector/adapters/config_to_ports.go index c897bec832..89c5269a28 100644 --- a/pkg/collector/adapters/config_to_ports.go +++ b/pkg/collector/adapters/config_to_ports.go @@ -97,3 +97,69 @@ func ConfigToReceiverPorts(logger logr.Logger, config map[interface{}]interface{ return ports, nil } + +// ConfigToContainerPorts converts the incoming configuration object into a set of container ports. +func ConfigToContainerPorts(logger logr.Logger, config map[interface{}]interface{}) ([]corev1.ContainerPort, error) { + // now, we gather which ports we might need to open + // for that, we get all the receivers and check their `endpoint` properties, + // extracting the port from it. The port name has to be a "DNS_LABEL", so, we try to make it follow the pattern: + // ${instance.Name}-${receiver.name}-${receiver.qualifier} + // the receiver-name is typically the node name from the receivers map + // the receiver-qualifier is what comes after the slash in the receiver name, but typically nil + // examples: + // ```yaml + // receivers: + // examplereceiver: + // endpoint: 0.0.0.0:12345 + // examplereceiver/settings: + // endpoint: 0.0.0.0:12346 + // in this case, we have two ports, named: "examplereceiver" and "examplereceiver-settings" + receiversProperty, ok := config["receivers"] + if !ok { + return nil, ErrNoReceivers + } + recEnabled := GetEnabledReceivers(logger, config) + if recEnabled == nil { + return nil, ErrReceiversNotAMap + } + receivers, ok := receiversProperty.(map[interface{}]interface{}) + if !ok { + return nil, ErrReceiversNotAMap + } + + ports := []corev1.ContainerPort{} + for key, val := range receivers { + // This check will pass only the enabled receivers, + // then only the related ports will be opened. + if !recEnabled[key] { + continue + } + receiver, ok := val.(map[interface{}]interface{}) + if !ok { + logger.Info("receiver doesn't seem to be a map of properties", "receiver", key) + receiver = map[interface{}]interface{}{} + } + + rcvrName := key.(string) + rcvrParser := parser.For(logger, rcvrName, receiver) + + rcvrPorts, err := rcvrParser.ContainerPorts() + if err != nil { + // should we break the process and return an error, or just ignore this faulty parser + // and let the other parsers add their ports to the container? right now, the best + // option seems to be to log the failures and move on, instead of failing them all + logger.Error(err, "parser for '%s' has returned an error: %w", rcvrName, err) + continue + } + + if len(rcvrPorts) > 0 { + ports = append(ports, rcvrPorts...) + } + } + + sort.Slice(ports, func(i, j int) bool { + return ports[i].Name < ports[j].Name + }) + + return ports, nil +} diff --git a/pkg/collector/adapters/config_to_ports_test.go b/pkg/collector/adapters/config_to_ports_test.go index 84c10e3d83..41641fdcc1 100644 --- a/pkg/collector/adapters/config_to_ports_test.go +++ b/pkg/collector/adapters/config_to_ports_test.go @@ -31,8 +31,7 @@ import ( var logger = logf.Log.WithName("unit-tests") -func TestExtractPortsFromConfig(t *testing.T) { - configStr := `receivers: +var portConfigStr = `receivers: examplereceiver: endpoint: "0.0.0.0:12345" examplereceiver/settings: @@ -77,8 +76,9 @@ service: exporters: [logging] ` +func TestExtractPortsFromConfig(t *testing.T) { // prepare - config, err := adapters.ConfigFromString(configStr) + config, err := adapters.ConfigFromString(portConfigStr) require.NoError(t, err) require.NotEmpty(t, config) @@ -94,18 +94,48 @@ service: targetPort4317 := intstr.IntOrString{Type: 0, IntVal: 4317, StrVal: ""} targetPort4318 := intstr.IntOrString{Type: 0, IntVal: 4318, StrVal: ""} + expectedPorts := []corev1.ServicePort{ + {Name: "examplereceiver", Port: 12345}, + {Name: "examplereceiver-settings", Port: 12346}, + {Name: "jaeger-custom-thrift-http", AppProtocol: &httpAppProtocol, Protocol: "TCP", Port: 15268, TargetPort: targetPortZero}, + {Name: "jaeger-grpc", AppProtocol: &grpcAppProtocol, Protocol: "TCP", Port: 14250}, + {Name: "jaeger-thrift-binary", Protocol: "UDP", Port: 6833}, + {Name: "jaeger-thrift-compact", Protocol: "UDP", Port: 6831}, + {Name: "otlp-2-grpc", AppProtocol: &grpcAppProtocol, Protocol: "TCP", Port: 55555}, + {Name: "otlp-grpc", AppProtocol: &grpcAppProtocol, Port: 4317, TargetPort: targetPort4317}, + {Name: "otlp-http", AppProtocol: &httpAppProtocol, Port: 4318, TargetPort: targetPort4318}, + {Name: "otlp-http-legacy", AppProtocol: &httpAppProtocol, Port: 55681, TargetPort: targetPort4318}, + {Name: "zipkin", AppProtocol: &httpAppProtocol, Protocol: "TCP", Port: 9411}, + } + assert.ElementsMatch(t, expectedPorts, ports) +} + +func TestExtractContainerPortsFromConfig(t *testing.T) { + // prepare + config, err := adapters.ConfigFromString(portConfigStr) + require.NoError(t, err) + require.NotEmpty(t, config) + + // test + ports, err := adapters.ConfigToContainerPorts(logger, config) + assert.NoError(t, err) assert.Len(t, ports, 11) - assert.Equal(t, corev1.ServicePort{Name: "examplereceiver", Port: int32(12345)}, ports[0]) - assert.Equal(t, corev1.ServicePort{Name: "examplereceiver-settings", Port: int32(12346)}, ports[1]) - assert.Equal(t, corev1.ServicePort{Name: "jaeger-custom-thrift-http", AppProtocol: &httpAppProtocol, Protocol: "TCP", Port: int32(15268), TargetPort: targetPortZero}, ports[2]) - assert.Equal(t, corev1.ServicePort{Name: "jaeger-grpc", AppProtocol: &grpcAppProtocol, Protocol: "TCP", Port: int32(14250)}, ports[3]) - assert.Equal(t, corev1.ServicePort{Name: "jaeger-thrift-binary", Protocol: "UDP", Port: int32(6833)}, ports[4]) - assert.Equal(t, corev1.ServicePort{Name: "jaeger-thrift-compact", Protocol: "UDP", Port: int32(6831)}, ports[5]) - assert.Equal(t, corev1.ServicePort{Name: "otlp-2-grpc", AppProtocol: &grpcAppProtocol, Protocol: "TCP", Port: int32(55555)}, ports[6]) - assert.Equal(t, corev1.ServicePort{Name: "otlp-grpc", AppProtocol: &grpcAppProtocol, Port: int32(4317), TargetPort: targetPort4317}, ports[7]) - assert.Equal(t, corev1.ServicePort{Name: "otlp-http", AppProtocol: &httpAppProtocol, Port: int32(4318), TargetPort: targetPort4318}, ports[8]) - assert.Equal(t, corev1.ServicePort{Name: "otlp-http-legacy", AppProtocol: &httpAppProtocol, Port: int32(55681), TargetPort: targetPort4318}, ports[9]) - assert.Equal(t, corev1.ServicePort{Name: "zipkin", AppProtocol: &httpAppProtocol, Protocol: "TCP", Port: int32(9411)}, ports[10]) + + // verify + expectedPorts := []corev1.ContainerPort{ + {Name: "examplereceiver", ContainerPort: 12345}, + {Name: "examplereceiver-settings", ContainerPort: 12346}, + {Name: "jaeger-custom-thrift-http", Protocol: "TCP", ContainerPort: 15268}, + {Name: "jaeger-grpc", Protocol: "TCP", ContainerPort: 14250}, + {Name: "jaeger-thrift-binary", Protocol: "UDP", ContainerPort: 6833}, + {Name: "jaeger-thrift-compact", Protocol: "UDP", ContainerPort: 6831}, + {Name: "otlp-2-grpc", Protocol: "TCP", ContainerPort: 55555}, + {Name: "otlp-grpc", Protocol: "TCP", ContainerPort: 4317}, + {Name: "otlp-http", Protocol: "TCP", ContainerPort: 4318}, + {Name: "otlp-http-legacy", Protocol: "TCP", ContainerPort: 55681}, + {Name: "zipkin", Protocol: "TCP", ContainerPort: 9411}, + } + assert.ElementsMatch(t, expectedPorts, ports) } func TestNoPortsParsed(t *testing.T) { @@ -132,10 +162,13 @@ func TestNoPortsParsed(t *testing.T) { // test ports, err := adapters.ConfigToReceiverPorts(logger, config) + cPorts, cErr := adapters.ConfigToContainerPorts(logger, config) // verify assert.Nil(t, ports) assert.Equal(t, tt.expected, err) + assert.Nil(t, cPorts) + assert.Equal(t, tt.expected, cErr) }) } } @@ -205,7 +238,8 @@ func TestParserFailed(t *testing.T) { } type mockParser struct { - portsFunc func() ([]corev1.ServicePort, error) + portsFunc func() ([]corev1.ServicePort, error) + containerPortsFunc func() ([]corev1.ContainerPort, error) } func (m *mockParser) Ports() ([]corev1.ServicePort, error) { @@ -216,6 +250,14 @@ func (m *mockParser) Ports() ([]corev1.ServicePort, error) { return nil, nil } +func (m *mockParser) ContainerPorts() ([]corev1.ContainerPort, error) { + if m.containerPortsFunc != nil { + return m.containerPortsFunc() + } + + return nil, nil +} + func (m *mockParser) ParserName() string { return "__mock-adapters" } diff --git a/pkg/collector/container.go b/pkg/collector/container.go index 5c0548feb4..1344720ca3 100644 --- a/pkg/collector/container.go +++ b/pkg/collector/container.go @@ -15,10 +15,16 @@ package collector import ( + "errors" "fmt" + "net" + "sort" + "strconv" "github.com/go-logr/logr" + "github.com/mitchellh/mapstructure" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/validation" "github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1" "github.com/open-telemetry/opentelemetry-operator/internal/config" @@ -26,6 +32,10 @@ import ( "github.com/open-telemetry/opentelemetry-operator/pkg/naming" ) +const maxPortLen = 15 + +var errInvalidPort = errors.New("invalid port name/num") + // Container builds a container for the given collector. func Container(cfg config.Config, logger logr.Logger, otelcol v1alpha1.OpenTelemetryCollector) corev1.Container { image := otelcol.Spec.Image @@ -33,6 +43,25 @@ func Container(cfg config.Config, logger logr.Logger, otelcol v1alpha1.OpenTelem image = cfg.CollectorImage() } + // build port map + ports := getConfigContainerPorts(logger, otelcol.Spec.Config) + for _, p := range otelcol.Spec.ContainerPorts { + truncName := naming.Truncate(p.Name, maxPortLen) + if p.Name != truncName { + logger.Info("truncating container port name", + "port.name.prev", p.Name, "port.name.new", truncName) + p.Name = truncName + } + nameErrs := validation.IsValidPortName(p.Name) + numErrs := validation.IsValidPortNum(int(p.ContainerPort)) + if len(nameErrs) > 0 || len(numErrs) > 0 { + logger.Error(errInvalidPort, "dropping container port", "port.name", p.Name, "port.num", p.ContainerPort, + "port.name.errs", nameErrs, "num.errs", numErrs) + continue + } + ports[p.Name] = p + } + argsMap := otelcol.Spec.Args if argsMap == nil { argsMap = map[string]string{} @@ -89,6 +118,7 @@ func Container(cfg config.Config, logger logr.Logger, otelcol v1alpha1.OpenTelem Name: naming.Container(), Image: image, ImagePullPolicy: otelcol.Spec.ImagePullPolicy, + Ports: portMapToList(ports), VolumeMounts: volumeMounts, Args: args, Env: envVars, @@ -98,3 +128,89 @@ func Container(cfg config.Config, logger logr.Logger, otelcol v1alpha1.OpenTelem LivenessProbe: livenessProbe, } } + +func getConfigContainerPorts(logger logr.Logger, cfg string) map[string]corev1.ContainerPort { + ports := map[string]corev1.ContainerPort{} + c, err := adapters.ConfigFromString(cfg) + if err != nil { + logger.Error(err, "couldn't extract the configuration") + return ports + } + ps, err := adapters.ConfigToContainerPorts(logger, c) + if err != nil { + logger.Error(err, "couldn't build container ports from configuration") + } else { + for _, p := range ps { + truncName := naming.Truncate(p.Name, maxPortLen) + if p.Name != truncName { + logger.Info("truncating container port name", + "port.name.prev", p.Name, "port.name.new", truncName) + p.Name = truncName + } + nameErrs := validation.IsValidPortName(p.Name) + numErrs := validation.IsValidPortNum(int(p.ContainerPort)) + if len(nameErrs) > 0 || len(numErrs) > 0 { + logger.Error(errInvalidPort, "dropping container port", "port.name", p.Name, "port.num", p.ContainerPort, + "port.name.errs", nameErrs, "num.errs", numErrs) + continue + } + ports[p.Name] = p + } + } + + metricsPort, err := getMetricsPort(c) + if err != nil { + logger.Error(err, "couldn't determine metrics port from configuration, using 8888 default value") + metricsPort = 8888 + } + ports["metrics"] = corev1.ContainerPort{ + Name: "metrics", + ContainerPort: metricsPort, + Protocol: corev1.ProtocolTCP, + } + return ports +} + +func getMetricsPort(c map[interface{}]interface{}) (int32, error) { + // we don't need to unmarshal the whole config, just follow the keys down to + // the metrics address. + type metricsCfg struct { + Address string + } + type telemetryCfg struct { + Metrics metricsCfg + } + type serviceCfg struct { + Telemetry telemetryCfg + } + type cfg struct { + Service serviceCfg + } + var cOut cfg + err := mapstructure.Decode(c, &cOut) + if err != nil { + return 0, err + } + + _, port, err := net.SplitHostPort(cOut.Service.Telemetry.Metrics.Address) + if err != nil { + return 0, err + } + i64, err := strconv.ParseInt(port, 10, 32) + if err != nil { + return 0, err + } + + return int32(i64), nil +} + +func portMapToList(portMap map[string]corev1.ContainerPort) []corev1.ContainerPort { + ports := make([]corev1.ContainerPort, 0, len(portMap)) + for _, p := range portMap { + ports = append(ports, p) + } + sort.Slice(ports, func(i, j int) bool { + return ports[i].Name < ports[j].Name + }) + return ports +} diff --git a/pkg/collector/container_test.go b/pkg/collector/container_test.go index 60820c9c2b..fdd0ed36cd 100644 --- a/pkg/collector/container_test.go +++ b/pkg/collector/container_test.go @@ -29,6 +29,12 @@ import ( var logger = logf.Log.WithName("unit-tests") +var metricContainerPort = corev1.ContainerPort{ + Name: "metrics", + ContainerPort: 8888, + Protocol: corev1.ProtocolTCP, +} + func TestContainerNewDefault(t *testing.T) { // prepare otelcol := v1alpha1.OpenTelemetryCollector{} @@ -39,6 +45,7 @@ func TestContainerNewDefault(t *testing.T) { // verify assert.Equal(t, "default-image", c.Image) + assert.Equal(t, []corev1.ContainerPort{metricContainerPort}, c.Ports) } func TestContainerWithImageOverridden(t *testing.T) { @@ -57,6 +64,184 @@ func TestContainerWithImageOverridden(t *testing.T) { assert.Equal(t, "overridden-image", c.Image) } +func TestContainerPorts(t *testing.T) { + var goodConfig = `receivers: + examplereceiver: + endpoint: "0.0.0.0:12345" +service: + pipelines: + metrics: + receivers: [examplereceiver] + exporters: [logging] +` + + tests := []struct { + description string + specConfig string + specPorts []corev1.ContainerPort + expectedPorts []corev1.ContainerPort + }{ + { + description: "bad spec config", + specConfig: "🦄", + specPorts: nil, + expectedPorts: []corev1.ContainerPort{}, + }, + { + description: "couldn't build ports from spec config", + specConfig: "", + specPorts: nil, + expectedPorts: []corev1.ContainerPort{metricContainerPort}, + }, + { + description: "ports in spec Config", + specConfig: goodConfig, + specPorts: nil, + expectedPorts: []corev1.ContainerPort{ + { + Name: "examplereceiver", + ContainerPort: 12345, + }, + metricContainerPort, + }, + }, + { + description: "ports in spec ContainerPorts", + specPorts: []corev1.ContainerPort{ + { + Name: "testport1", + ContainerPort: 12345, + }, + }, + expectedPorts: []corev1.ContainerPort{ + metricContainerPort, + { + Name: "testport1", + ContainerPort: 12345, + }, + }, + }, + { + description: "ports in spec Config and ContainerPorts", + specConfig: goodConfig, + specPorts: []corev1.ContainerPort{ + { + Name: "testport1", + ContainerPort: 12345, + }, + { + Name: "testport2", + ContainerPort: 54321, + Protocol: corev1.ProtocolUDP, + }, + }, + expectedPorts: []corev1.ContainerPort{ + { + Name: "examplereceiver", + ContainerPort: 12345, + }, + metricContainerPort, + { + Name: "testport1", + ContainerPort: 12345, + }, + { + Name: "testport2", + ContainerPort: 54321, + Protocol: corev1.ProtocolUDP, + }, + }, + }, + { + description: "invalid port name", + specConfig: goodConfig, + specPorts: []corev1.ContainerPort{ + { + // this port name contains a non alphanumeric character, which is invalid. + Name: "-test🦄port", + ContainerPort: 12345, + Protocol: corev1.ProtocolTCP, + }, + }, + expectedPorts: []corev1.ContainerPort{ + { + Name: "examplereceiver", + ContainerPort: 12345, + }, + metricContainerPort, + }, + }, + { + description: "long port name", + specConfig: goodConfig, + specPorts: []corev1.ContainerPort{ + { + // this port name is longer than 15 characters, which is invalid. + Name: "testportaaaabbbb", + ContainerPort: 5, + Protocol: corev1.ProtocolTCP, + }, + }, + expectedPorts: []corev1.ContainerPort{ + { + Name: "examplereceiver", + ContainerPort: 12345, + }, + metricContainerPort, + { + Name: "testportaaaabbb", + ContainerPort: 5, + Protocol: corev1.ProtocolTCP, + }, + }, + }, + { + description: "duplicate port name", + specConfig: goodConfig, + specPorts: []corev1.ContainerPort{ + { + Name: "testport1", + ContainerPort: 12345, + }, + { + Name: "testport1", + ContainerPort: 11111, + }, + }, + expectedPorts: []corev1.ContainerPort{ + { + Name: "examplereceiver", + ContainerPort: 12345, + }, + metricContainerPort, + { + Name: "testport1", + ContainerPort: 11111, + }, + }, + }, + } + + for _, testCase := range tests { + t.Run(testCase.description, func(t *testing.T) { + // prepare + otelcol := v1alpha1.OpenTelemetryCollector{ + Spec: v1alpha1.OpenTelemetryCollectorSpec{ + Config: testCase.specConfig, + ContainerPorts: testCase.specPorts, + }, + } + cfg := config.New(config.WithCollectorImage("default-image")) + + // test + c := Container(cfg, logger, otelcol) + + // verify + assert.ElementsMatch(t, testCase.expectedPorts, c.Ports) + }) + } +} + func TestContainerConfigFlagIsIgnored(t *testing.T) { // prepare otelcol := v1alpha1.OpenTelemetryCollector{ diff --git a/pkg/collector/parser/receiver.go b/pkg/collector/parser/receiver.go index ef950c89b7..6c1fc5c523 100644 --- a/pkg/collector/parser/receiver.go +++ b/pkg/collector/parser/receiver.go @@ -37,6 +37,9 @@ type ReceiverParser interface { // Ports returns the service ports parsed based on the receiver's configuration Ports() ([]corev1.ServicePort, error) + // ContainerPorts returns the container ports parsed based on the receiver's configuration. + ContainerPorts() ([]corev1.ContainerPort, error) + // ParserName returns the name of this parser ParserName() string } @@ -133,6 +136,60 @@ func singlePortFromConfigEndpoint(logger logr.Logger, name string, config map[in return nil } +func singleContainerPortFromConfigEndpoint(logger logr.Logger, name string, config map[interface{}]interface{}) *v1.ContainerPort { + var endpoint interface{} + switch { + // syslog receiver contains the endpoint + // that needs to be exposed one level down inside config + // i.e. either in tcp or udp section with field key + // as `listen_address` + case name == "syslog": + var c map[interface{}]interface{} + if udp, isUDP := config["udp"]; isUDP && udp != nil { + c = udp.(map[interface{}]interface{}) + endpoint = getAddressFromConfig(logger, name, listenAddressKey, c) + } else if tcp, isTCP := config["tcp"]; isTCP && tcp != nil { + c = tcp.(map[interface{}]interface{}) + endpoint = getAddressFromConfig(logger, name, listenAddressKey, c) + } + + // tcplog and udplog receivers hold the endpoint + // value in `listen_address` field + case name == "tcplog" || name == "udplog": + endpoint = getAddressFromConfig(logger, name, listenAddressKey, config) + + // ignore kubeletstats receiver as it holds the field key endpoint, and it + // is a scraper, we only expose endpoint through k8s service objects for + // receivers that aren't scrapers. + case name == "kubeletstats": + return nil + + // ignore prometheus receiver as it has no listening endpoint + case name == "prometheus": + return nil + + default: + endpoint = getAddressFromConfig(logger, name, endpointKey, config) + } + + switch endpoint := endpoint.(type) { + case string: + port, err := portFromEndpoint(endpoint) + if err != nil { + logger.WithValues(endpointKey, endpoint).Info("couldn't parse the endpoint's port") + return nil + } + + return &corev1.ContainerPort{ + Name: portName(name, port), + ContainerPort: port, + } + default: + logger.Info("receiver's endpoint isn't a string") + } + + return nil +} func getAddressFromConfig(logger logr.Logger, name, key string, config map[interface{}]interface{}) interface{} { endpoint, ok := config[key] diff --git a/pkg/collector/parser/receiver_generic.go b/pkg/collector/parser/receiver_generic.go index 21b4e2ce53..7751cde05a 100644 --- a/pkg/collector/parser/receiver_generic.go +++ b/pkg/collector/parser/receiver_generic.go @@ -68,6 +68,25 @@ func (g *GenericReceiver) Ports() ([]corev1.ServicePort, error) { return []corev1.ServicePort{}, nil } +// ContainerPorts returns all the container ports for all protocols in this parser. +func (g *GenericReceiver) ContainerPorts() ([]corev1.ContainerPort, error) { + port := singleContainerPortFromConfigEndpoint(g.logger, g.name, g.config) + if port != nil { + port.Protocol = g.defaultProtocol + return []corev1.ContainerPort{*port}, nil + } + + if g.defaultPort > 0 { + return []corev1.ContainerPort{{ + ContainerPort: g.defaultPort, + Name: portName(g.name, g.defaultPort), + Protocol: g.defaultProtocol, + }}, nil + } + + return []corev1.ContainerPort{}, nil +} + // ParserName returns the name of this parser. func (g *GenericReceiver) ParserName() string { return g.parserName diff --git a/pkg/collector/parser/receiver_generic_test.go b/pkg/collector/parser/receiver_generic_test.go index 15fd903591..c3e5132403 100644 --- a/pkg/collector/parser/receiver_generic_test.go +++ b/pkg/collector/parser/receiver_generic_test.go @@ -33,13 +33,25 @@ func TestParseEndpoint(t *testing.T) { "endpoint": "0.0.0.0:1234", }) - // test - ports, err := builder.Ports() + t.Run("service port parsed", func(t *testing.T) { + // test + ports, err := builder.Ports() + + // verify + assert.NoError(t, err) + assert.Len(t, ports, 1) + assert.EqualValues(t, 1234, ports[0].Port) + }) + + t.Run("container port parsed", func(t *testing.T) { + // test + ports, err := builder.ContainerPorts() - // verify - assert.NoError(t, err) - assert.Len(t, ports, 1) - assert.EqualValues(t, 1234, ports[0].Port) + // verify + assert.NoError(t, err) + assert.Len(t, ports, 1) + assert.EqualValues(t, 1234, ports[0].ContainerPort) + }) } func TestFailedToParseEndpoint(t *testing.T) { @@ -49,12 +61,23 @@ func TestFailedToParseEndpoint(t *testing.T) { "endpoint": "0.0.0.0", }) - // test - ports, err := builder.Ports() + t.Run("no service ports", func(t *testing.T) { + // test + ports, err := builder.Ports() + + // verify + assert.NoError(t, err) + assert.Len(t, ports, 0) + }) + + t.Run("no container ports", func(t *testing.T) { + // test + ports, err := builder.ContainerPorts() - // verify - assert.NoError(t, err) - assert.Len(t, ports, 0) + // verify + assert.NoError(t, err) + assert.Len(t, ports, 0) + }) } func TestDownstreamParsers(t *testing.T) { @@ -90,7 +113,7 @@ func TestDownstreamParsers(t *testing.T) { assert.Equal(t, tt.parserName, builder.ParserName()) }) - t.Run("assigns the expected port", func(t *testing.T) { + t.Run("assigns the expected service port", func(t *testing.T) { // prepare builder := tt.builder(logger, tt.receiverName, map[interface{}]interface{}{}) @@ -104,7 +127,21 @@ func TestDownstreamParsers(t *testing.T) { assert.Equal(t, tt.receiverName, ports[0].Name) }) - t.Run("allows port to be overridden", func(t *testing.T) { + t.Run("assigns the expected container port", func(t *testing.T) { + // prepare + builder := tt.builder(logger, tt.receiverName, map[interface{}]interface{}{}) + + // test + ports, err := builder.ContainerPorts() + + // verify + assert.NoError(t, err) + assert.Len(t, ports, 1) + assert.EqualValues(t, tt.defaultPort, ports[0].ContainerPort) + assert.Equal(t, tt.receiverName, ports[0].Name) + }) + + t.Run("allows service port to be overridden", func(t *testing.T) { // prepare builder := tt.builder(logger, tt.receiverName, map[interface{}]interface{}{ "endpoint": "0.0.0.0:65535", @@ -119,6 +156,22 @@ func TestDownstreamParsers(t *testing.T) { assert.EqualValues(t, 65535, ports[0].Port) assert.Equal(t, tt.receiverName, ports[0].Name) }) + + t.Run("allows container port to be overridden", func(t *testing.T) { + // prepare + builder := tt.builder(logger, tt.receiverName, map[interface{}]interface{}{ + "endpoint": "0.0.0.0:65535", + }) + + // test + ports, err := builder.ContainerPorts() + + // verify + assert.NoError(t, err) + assert.Len(t, ports, 1) + assert.EqualValues(t, 65535, ports[0].ContainerPort) + assert.Equal(t, tt.receiverName, ports[0].Name) + }) }) } } diff --git a/pkg/collector/parser/receiver_jaeger.go b/pkg/collector/parser/receiver_jaeger.go index 428938c313..14286b2d11 100644 --- a/pkg/collector/parser/receiver_jaeger.go +++ b/pkg/collector/parser/receiver_jaeger.go @@ -125,6 +125,71 @@ func (j *JaegerReceiverParser) Ports() ([]corev1.ServicePort, error) { return ports, nil } +// ContainerPorts returns all the container ports for all protocols in this parser. +func (j *JaegerReceiverParser) ContainerPorts() ([]corev1.ContainerPort, error) { + ports := []corev1.ContainerPort{} + + for _, protocol := range []struct { + name string + transportProtocol corev1.Protocol + appProtocol string + defaultPort int32 + }{ + { + name: "grpc", + defaultPort: defaultGRPCPort, + transportProtocol: corev1.ProtocolTCP, + appProtocol: "grpc", + }, + { + name: "thrift_http", + defaultPort: defaultThriftHTTPPort, + transportProtocol: corev1.ProtocolTCP, + appProtocol: "http", + }, + { + name: "thrift_compact", + defaultPort: defaultThriftCompactPort, + transportProtocol: corev1.ProtocolUDP, + }, + { + name: "thrift_binary", + defaultPort: defaultThriftBinaryPort, + transportProtocol: corev1.ProtocolUDP, + }, + } { + // do we have the protocol specified at all? + if receiverProtocol, ok := j.config[protocol.name]; ok { + // we have the specified protocol, we definitely need a container port + nameWithProtocol := fmt.Sprintf("%s-%s", j.name, protocol.name) + var protocolPort *corev1.ContainerPort + + // do we have a configuration block for the protocol? + settings, ok := receiverProtocol.(map[interface{}]interface{}) + if ok { + protocolPort = singleContainerPortFromConfigEndpoint(j.logger, nameWithProtocol, settings) + } + + // have we parsed a port based on the configuration block? + // if not, we use the default port + if protocolPort == nil { + protocolPort = &corev1.ContainerPort{ + Name: portName(nameWithProtocol, protocol.defaultPort), + ContainerPort: protocol.defaultPort, + } + } + + // set the appropriate transport protocol (i.e. TCP/UDP) for this kind of receiver protocol + protocolPort.Protocol = protocol.transportProtocol + + // at this point, we *have* a port specified, add it to the list of ports + ports = append(ports, *protocolPort) + } + } + + return ports, nil +} + // ParserName returns the name of this parser. func (j *JaegerReceiverParser) ParserName() string { return parserNameJaeger diff --git a/pkg/collector/parser/receiver_jaeger_test.go b/pkg/collector/parser/receiver_jaeger_test.go index 827c988c4a..7af96ba174 100644 --- a/pkg/collector/parser/receiver_jaeger_test.go +++ b/pkg/collector/parser/receiver_jaeger_test.go @@ -42,14 +42,27 @@ func TestJaegerMinimalConfiguration(t *testing.T) { }, }) - // test - ports, err := builder.Ports() + t.Run("service port exists", func(t *testing.T) { + // test + ports, err := builder.Ports() + + // verify + assert.NoError(t, err) + assert.Len(t, ports, 1) + assert.EqualValues(t, 14250, ports[0].Port) + assert.EqualValues(t, corev1.ProtocolTCP, ports[0].Protocol) + }) - // verify - assert.NoError(t, err) - assert.Len(t, ports, 1) - assert.EqualValues(t, 14250, ports[0].Port) - assert.EqualValues(t, corev1.ProtocolTCP, ports[0].Protocol) + t.Run("container port exists", func(t *testing.T) { + // test + ports, err := builder.ContainerPorts() + + // verify + assert.NoError(t, err) + assert.Len(t, ports, 1) + assert.EqualValues(t, 14250, ports[0].ContainerPort) + assert.EqualValues(t, corev1.ProtocolTCP, ports[0].Protocol) + }) } func TestJaegerPortsOverridden(t *testing.T) { @@ -62,14 +75,27 @@ func TestJaegerPortsOverridden(t *testing.T) { }, }) - // test - ports, err := builder.Ports() + t.Run("service ports overridden", func(t *testing.T) { + // test + ports, err := builder.Ports() - // verify - assert.NoError(t, err) - assert.Len(t, ports, 1) - assert.EqualValues(t, 1234, ports[0].Port) - assert.EqualValues(t, corev1.ProtocolTCP, ports[0].Protocol) + // verify + assert.NoError(t, err) + assert.Len(t, ports, 1) + assert.EqualValues(t, 1234, ports[0].Port) + assert.EqualValues(t, corev1.ProtocolTCP, ports[0].Protocol) + }) + + t.Run("container ports overridden", func(t *testing.T) { + // test + ports, err := builder.ContainerPorts() + + // verify + assert.NoError(t, err) + assert.Len(t, ports, 1) + assert.EqualValues(t, 1234, ports[0].ContainerPort) + assert.EqualValues(t, corev1.ProtocolTCP, ports[0].Protocol) + }) } func TestJaegerExposeDefaultPorts(t *testing.T) { @@ -86,7 +112,6 @@ func TestJaegerExposeDefaultPorts(t *testing.T) { expectedResults := map[string]struct { transportProtocol corev1.Protocol portNumber int32 - seen bool }{ "jaeger-grpc": {portNumber: 14250, transportProtocol: corev1.ProtocolTCP}, "jaeger-thrift-http": {portNumber: 14268, transportProtocol: corev1.ProtocolTCP}, @@ -94,21 +119,45 @@ func TestJaegerExposeDefaultPorts(t *testing.T) { "jaeger-thrift-binary": {portNumber: 6832, transportProtocol: corev1.ProtocolUDP}, } - // test - ports, err := builder.Ports() + t.Run("service ports exposed", func(t *testing.T) { + // test + ports, err := builder.Ports() + + // verify + assert.NoError(t, err) + assert.Len(t, ports, 4) + + seen := map[string]bool{} + for _, port := range ports { + r, ok := expectedResults[port.Name] + seen[port.Name] = true + assert.True(t, ok, "unexpected service port %s", port.Name) + assert.EqualValues(t, r.portNumber, port.Port) + assert.EqualValues(t, r.transportProtocol, port.Protocol) + } + for k := range expectedResults { + assert.True(t, seen[k], "the port %s wasn't included in the service ports", k) + } + }) - // verify - assert.NoError(t, err) - assert.Len(t, ports, 4) - - for _, port := range ports { - r := expectedResults[port.Name] - r.seen = true - expectedResults[port.Name] = r - assert.EqualValues(t, r.portNumber, port.Port) - assert.EqualValues(t, r.transportProtocol, port.Protocol) - } - for k, v := range expectedResults { - assert.True(t, v.seen, "the port %s wasn't included in the service ports", k) - } + t.Run("container ports exposed", func(t *testing.T) { + // test + ports, err := builder.ContainerPorts() + + // verify + assert.NoError(t, err) + assert.Len(t, ports, 4) + + seen := map[string]bool{} + for _, port := range ports { + r, ok := expectedResults[port.Name] + seen[port.Name] = true + assert.True(t, ok, "unexpected container port %s", port.Name) + assert.EqualValues(t, r.portNumber, port.ContainerPort) + assert.EqualValues(t, r.transportProtocol, port.Protocol) + } + for k := range expectedResults { + assert.True(t, seen[k], "the port %s wasn't included in the container ports", k) + } + }) } diff --git a/pkg/collector/parser/receiver_otlp.go b/pkg/collector/parser/receiver_otlp.go index 2fa208fe94..7fa00223c3 100644 --- a/pkg/collector/parser/receiver_otlp.go +++ b/pkg/collector/parser/receiver_otlp.go @@ -130,6 +130,66 @@ func (o *OTLPReceiverParser) Ports() ([]corev1.ServicePort, error) { return ports, nil } +// ContainerPorts returns all the container ports for all protocols in this parser. +func (o *OTLPReceiverParser) ContainerPorts() ([]corev1.ContainerPort, error) { + ports := []corev1.ContainerPort{} + + for _, protocol := range []struct { + name string + defaultPorts []corev1.ContainerPort + }{ + { + name: grpc, + defaultPorts: []corev1.ContainerPort{ + { + Name: portName(fmt.Sprintf("%s-grpc", o.name), defaultOTLPGRPCPort), + ContainerPort: defaultOTLPGRPCPort, + Protocol: corev1.ProtocolTCP, + }, + }, + }, + { + name: http, + defaultPorts: []corev1.ContainerPort{ + { + Name: portName(fmt.Sprintf("%s-http", o.name), defaultOTLPHTTPPort), + ContainerPort: defaultOTLPHTTPPort, + Protocol: corev1.ProtocolTCP, + }, + { + Name: portName(fmt.Sprintf("%s-http-legacy", o.name), defaultOTLPHTTPLegacyPort), + ContainerPort: defaultOTLPHTTPLegacyPort, + Protocol: corev1.ProtocolTCP, + }, + }, + }, + } { + // do we have the protocol specified at all? + if receiverProtocol, ok := o.config[protocol.name]; ok { + // we have the specified protocol, we definitely need a container port + nameWithProtocol := fmt.Sprintf("%s-%s", o.name, protocol.name) + var protocolPort *corev1.ContainerPort + + // do we have a configuration block for the protocol? + settings, ok := receiverProtocol.(map[interface{}]interface{}) + if ok { + protocolPort = singleContainerPortFromConfigEndpoint(o.logger, nameWithProtocol, settings) + } + + // have we parsed a port based on the configuration block? + // if not, we use the default port + if protocolPort == nil { + ports = append(ports, protocol.defaultPorts...) + } else { + protocolPort.Protocol = corev1.ProtocolTCP + ports = append(ports, *protocolPort) + } + } + } + + return ports, nil +} + // ParserName returns the name of this parser. func (o *OTLPReceiverParser) ParserName() string { return parserNameOTLP diff --git a/pkg/collector/parser/receiver_otlp_test.go b/pkg/collector/parser/receiver_otlp_test.go index 6abca3b5d1..da98ed88df 100644 --- a/pkg/collector/parser/receiver_otlp_test.go +++ b/pkg/collector/parser/receiver_otlp_test.go @@ -48,28 +48,50 @@ func TestOTLPPortsOverridden(t *testing.T) { expectedResults := map[string]struct { portNumber int32 - seen bool }{ "otlp-grpc": {portNumber: 1234}, "otlp-http": {portNumber: 1235}, } - // test - ports, err := builder.Ports() + t.Run("service ports overridden", func(t *testing.T) { + // test + ports, err := builder.Ports() + + // verify + assert.NoError(t, err) + assert.Len(t, ports, len(expectedResults)) + + seen := map[string]bool{} + for _, port := range ports { + r, ok := expectedResults[port.Name] + seen[port.Name] = true + assert.True(t, ok, "unexpected service port %s", port.Name) + assert.EqualValues(t, r.portNumber, port.Port) + } + for k := range expectedResults { + assert.True(t, seen[k], "the port %s wasn't included in the service ports", k) + } + }) - // verify - assert.NoError(t, err) - assert.Len(t, ports, len(expectedResults)) - - for _, port := range ports { - r := expectedResults[port.Name] - r.seen = true - expectedResults[port.Name] = r - assert.EqualValues(t, r.portNumber, port.Port) - } - for k, v := range expectedResults { - assert.True(t, v.seen, "the port %s wasn't included in the service ports", k) - } + t.Run("container ports overridden", func(t *testing.T) { + // test + ports, err := builder.ContainerPorts() + + // verify + assert.NoError(t, err) + assert.Len(t, ports, len(expectedResults)) + + seen := map[string]bool{} + for _, port := range ports { + r, ok := expectedResults[port.Name] + seen[port.Name] = true + assert.True(t, ok, "unexpected container port %s", port.Name) + assert.EqualValues(t, r.portNumber, port.ContainerPort) + } + for k := range expectedResults { + assert.True(t, seen[k], "the port %s wasn't included in the container ports", k) + } + }) } func TestOTLPExposeDefaultPorts(t *testing.T) { @@ -83,27 +105,49 @@ func TestOTLPExposeDefaultPorts(t *testing.T) { expectedResults := map[string]struct { portNumber int32 - seen bool }{ "otlp-grpc": {portNumber: 4317}, "otlp-http": {portNumber: 4318}, "otlp-http-legacy": {portNumber: 55681}, } - // test - ports, err := builder.Ports() + t.Run("service ports exposed", func(t *testing.T) { + // test + ports, err := builder.Ports() + + // verify + assert.NoError(t, err) + assert.Len(t, ports, len(expectedResults)) + + seen := map[string]bool{} + for _, port := range ports { + r, ok := expectedResults[port.Name] + seen[port.Name] = true + assert.True(t, ok, "unexpected service port %s", port.Name) + assert.EqualValues(t, r.portNumber, port.Port) + } + for k := range expectedResults { + assert.True(t, seen[k], "the port %s wasn't included in the service ports", k) + } + }) - // verify - assert.NoError(t, err) - assert.Len(t, ports, len(expectedResults)) - - for _, port := range ports { - r := expectedResults[port.Name] - r.seen = true - expectedResults[port.Name] = r - assert.EqualValues(t, r.portNumber, port.Port) - } - for k, v := range expectedResults { - assert.True(t, v.seen, "the port %s wasn't included in the service ports", k) - } + t.Run("container ports exposed", func(t *testing.T) { + // test + ports, err := builder.ContainerPorts() + + // verify + assert.NoError(t, err) + assert.Len(t, ports, len(expectedResults)) + + seen := map[string]bool{} + for _, port := range ports { + r, ok := expectedResults[port.Name] + seen[port.Name] = true + assert.True(t, ok, "unexpected container port %s", port.Name) + assert.EqualValues(t, r.portNumber, port.ContainerPort) + } + for k := range expectedResults { + assert.True(t, seen[k], "the port %s wasn't included in the container ports", k) + } + }) } diff --git a/pkg/collector/parser/receiver_test.go b/pkg/collector/parser/receiver_test.go index 2566f7a7be..5acb361631 100644 --- a/pkg/collector/parser/receiver_test.go +++ b/pkg/collector/parser/receiver_test.go @@ -92,11 +92,21 @@ func TestReceiverFailsWhenPortIsntString(t *testing.T) { "endpoint": 123, } - // test - p := singlePortFromConfigEndpoint(logger, "myreceiver", config) + t.Run("service port fails", func(t *testing.T) { + // test + p := singlePortFromConfigEndpoint(logger, "myreceiver", config) - // verify - assert.Nil(t, p) + // verify + assert.Nil(t, p) + }) + + t.Run("container port fails", func(t *testing.T) { + // test + p := singleContainerPortFromConfigEndpoint(logger, "myreceiver", config) + + // verify + assert.Nil(t, p) + }) } func TestIgnorekubeletstatsEndpoint(t *testing.T) { @@ -106,12 +116,23 @@ func TestIgnorekubeletstatsEndpoint(t *testing.T) { "endpoint": "0.0.0.0:9000", }) - // test - ports, err := builder.Ports() + t.Run("no service ports", func(t *testing.T) { + // test + ports, err := builder.Ports() - // verify - assert.NoError(t, err) - assert.Len(t, ports, 0) + // verify + assert.NoError(t, err) + assert.Len(t, ports, 0) + }) + + t.Run("no container ports", func(t *testing.T) { + // test + ports, err := builder.ContainerPorts() + + // verify + assert.NoError(t, err) + assert.Len(t, ports, 0) + }) } func TestReceiverFallbackWhenNotRegistered(t *testing.T) { @@ -144,6 +165,10 @@ func (m *mockParser) Ports() ([]corev1.ServicePort, error) { return nil, nil } +func (m *mockParser) ContainerPorts() ([]corev1.ContainerPort, error) { + return nil, nil +} + func (m *mockParser) ParserName() string { return "__mock" } From 5ffaf6a6ea9a790c25b54148a19b3274e918508d Mon Sep 17 00:00:00 2001 From: Kristina Pathak Date: Mon, 19 Sep 2022 19:07:26 -0700 Subject: [PATCH 2/4] get container ports from service ports --- apis/v1alpha1/opentelemetrycollector_types.go | 9 +- .../opentelemetrycollector_webhook.go | 11 ++ ...ntelemetry.io_opentelemetrycollectors.yaml | 45 +------ ...ntelemetry.io_opentelemetrycollectors.yaml | 45 +------ docs/api.md | 70 +---------- pkg/collector/adapters/config_to_ports.go | 66 ----------- .../adapters/config_to_ports_test.go | 42 +------ pkg/collector/container.go | 39 +++--- pkg/collector/container_test.go | 50 ++++---- pkg/collector/parser/receiver.go | 57 --------- pkg/collector/parser/receiver_generic.go | 19 --- pkg/collector/parser/receiver_generic_test.go | 79 ++----------- pkg/collector/parser/receiver_jaeger.go | 65 ---------- pkg/collector/parser/receiver_jaeger_test.go | 111 +++++------------- pkg/collector/parser/receiver_otlp.go | 60 ---------- pkg/collector/parser/receiver_otlp_test.go | 108 +++++------------ pkg/collector/parser/receiver_test.go | 43 ++----- 17 files changed, 144 insertions(+), 775 deletions(-) diff --git a/apis/v1alpha1/opentelemetrycollector_types.go b/apis/v1alpha1/opentelemetrycollector_types.go index 0f3a982867..8688ca69be 100644 --- a/apis/v1alpha1/opentelemetrycollector_types.go +++ b/apis/v1alpha1/opentelemetrycollector_types.go @@ -83,17 +83,10 @@ type OpenTelemetryCollectorSpec struct { VolumeMounts []v1.VolumeMount `json:"volumeMounts,omitempty"` // Ports allows a set of ports to be exposed by the underlying v1.Service. By default, the operator // will attempt to infer the required ports by parsing the .Spec.Config property but this property can be - // used to open aditional ports that can't be inferred by the operator, like for custom receivers. + // used to open additional ports that can't be inferred by the operator, like for custom receivers. // +optional // +listType=atomic Ports []v1.ServicePort `json:"ports,omitempty"` - // ContainerPorts allows a set of ports to be exposed by the underlying v1.Container. By default, the operator - // will attempt to infer the required ports by parsing the .Spec.Config property but this property can be - // used to open additional ports that can't be inferred by the operator, like for custom receivers. Any ports - // specified here will overwrite inferred ports if they have the same name. - // +optional - // +listType=atomic - ContainerPorts []v1.ContainerPort `json:"containerPorts,omitempty"` // ENV vars to set on the OpenTelemetry Collector's Pods. These can then in certain cases be // consumed in the config file for the Collector. // +optional diff --git a/apis/v1alpha1/opentelemetrycollector_webhook.go b/apis/v1alpha1/opentelemetrycollector_webhook.go index 78e9719628..12b6d98904 100644 --- a/apis/v1alpha1/opentelemetrycollector_webhook.go +++ b/apis/v1alpha1/opentelemetrycollector_webhook.go @@ -16,6 +16,7 @@ package v1alpha1 import ( "fmt" + "k8s.io/apimachinery/pkg/util/validation" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" @@ -120,6 +121,16 @@ func (r *OpenTelemetryCollector) validateCRDSpec() error { } } + // validator port config + for _, p := range r.Spec.Ports { + nameErrs := validation.IsValidPortName(p.Name) + numErrs := validation.IsValidPortNum(int(p.Port)) + if len(nameErrs) > 0 || len(numErrs) > 0 { + return fmt.Errorf("the OpenTelemetry Spec Ports configuration is incorrect, port name '%s' errors: %s, num '%d' errors: %s", + p.Name, nameErrs, p.Port, numErrs) + } + } + // validate autoscale with horizontal pod autoscaler if r.Spec.MaxReplicas != nil { if *r.Spec.MaxReplicas < int32(1) { diff --git a/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml b/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml index 9ca8e35940..d9efcacdde 100644 --- a/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml +++ b/bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml @@ -194,49 +194,6 @@ spec: configuration. Refer to the OpenTelemetry Collector documentation for details. type: string - containerPorts: - description: ContainerPorts allows a set of ports to be exposed by - the underlying v1.Container. By default, the operator will attempt - to infer the required ports by parsing the .Spec.Config property - but this property can be used to open additional ports that can't - be inferred by the operator, like for custom receivers. Any ports - specified here will overwrite inferred ports if they have the same - name. - items: - description: ContainerPort represents a network port in a single - container. - properties: - containerPort: - description: Number of port to expose on the pod's IP address. - This must be a valid port number, 0 < x < 65536. - format: int32 - type: integer - hostIP: - description: What host IP to bind the external port to. - type: string - hostPort: - description: Number of port to expose on the host. If specified, - this must be a valid port number, 0 < x < 65536. If HostNetwork - is specified, this must match ContainerPort. Most containers - do not need this. - format: int32 - type: integer - name: - description: If specified, this must be an IANA_SVC_NAME and - unique within the pod. Each named port in a pod must have - a unique name. Name for the port that can be referred to by - services. - type: string - protocol: - default: TCP - description: Protocol for port. Must be UDP, TCP, or SCTP. Defaults - to "TCP". - type: string - required: - - containerPort - type: object - type: array - x-kubernetes-list-type: atomic env: description: ENV vars to set on the OpenTelemetry Collector's Pods. These can then in certain cases be consumed in the config file for @@ -599,7 +556,7 @@ spec: description: Ports allows a set of ports to be exposed by the underlying v1.Service. By default, the operator will attempt to infer the required ports by parsing the .Spec.Config property but this property can - be used to open aditional ports that can't be inferred by the operator, + be used to open additional ports that can't be inferred by the operator, like for custom receivers. items: description: ServicePort contains information on service's port. diff --git a/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml b/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml index 8f709b8ece..16a0124882 100644 --- a/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml +++ b/config/crd/bases/opentelemetry.io_opentelemetrycollectors.yaml @@ -192,49 +192,6 @@ spec: configuration. Refer to the OpenTelemetry Collector documentation for details. type: string - containerPorts: - description: ContainerPorts allows a set of ports to be exposed by - the underlying v1.Container. By default, the operator will attempt - to infer the required ports by parsing the .Spec.Config property - but this property can be used to open additional ports that can't - be inferred by the operator, like for custom receivers. Any ports - specified here will overwrite inferred ports if they have the same - name. - items: - description: ContainerPort represents a network port in a single - container. - properties: - containerPort: - description: Number of port to expose on the pod's IP address. - This must be a valid port number, 0 < x < 65536. - format: int32 - type: integer - hostIP: - description: What host IP to bind the external port to. - type: string - hostPort: - description: Number of port to expose on the host. If specified, - this must be a valid port number, 0 < x < 65536. If HostNetwork - is specified, this must match ContainerPort. Most containers - do not need this. - format: int32 - type: integer - name: - description: If specified, this must be an IANA_SVC_NAME and - unique within the pod. Each named port in a pod must have - a unique name. Name for the port that can be referred to by - services. - type: string - protocol: - default: TCP - description: Protocol for port. Must be UDP, TCP, or SCTP. Defaults - to "TCP". - type: string - required: - - containerPort - type: object - type: array - x-kubernetes-list-type: atomic env: description: ENV vars to set on the OpenTelemetry Collector's Pods. These can then in certain cases be consumed in the config file for @@ -597,7 +554,7 @@ spec: description: Ports allows a set of ports to be exposed by the underlying v1.Service. By default, the operator will attempt to infer the required ports by parsing the .Spec.Config property but this property can - be used to open aditional ports that can't be inferred by the operator, + be used to open additional ports that can't be inferred by the operator, like for custom receivers. items: description: ServicePort contains information on service's port. diff --git a/docs/api.md b/docs/api.md index 23987bb7ce..aa9e100583 100644 --- a/docs/api.md +++ b/docs/api.md @@ -1705,13 +1705,6 @@ OpenTelemetryCollectorSpec defines the desired state of OpenTelemetryCollector. Config is the raw JSON to be used as the collector's configuration. Refer to the OpenTelemetry Collector documentation for details.
false - - containerPorts - []object - - ContainerPorts allows a set of ports to be exposed by the underlying v1.Container. By default, the operator will attempt to infer the required ports by parsing the .Spec.Config property but this property can be used to open additional ports that can't be inferred by the operator, like for custom receivers. Any ports specified here will overwrite inferred ports if they have the same name.
- - false env []object @@ -1799,7 +1792,7 @@ OpenTelemetryCollectorSpec defines the desired state of OpenTelemetryCollector. ports []object - Ports allows a set of ports to be exposed by the underlying v1.Service. By default, the operator will attempt to infer the required ports by parsing the .Spec.Config property but this property can be used to open aditional ports that can't be inferred by the operator, like for custom receivers.
+ Ports allows a set of ports to be exposed by the underlying v1.Service. By default, the operator will attempt to infer the required ports by parsing the .Spec.Config property but this property can be used to open additional ports that can't be inferred by the operator, like for custom receivers.
false @@ -2126,67 +2119,6 @@ HPAScalingPolicy is a single policy which must hold true for a specified past in -### OpenTelemetryCollector.spec.containerPorts[index] -[↩ Parent](#opentelemetrycollectorspec) - - - -ContainerPort represents a network port in a single container. - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
NameTypeDescriptionRequired
containerPortinteger - Number of port to expose on the pod's IP address. This must be a valid port number, 0 < x < 65536.
-
- Format: int32
-
true
hostIPstring - What host IP to bind the external port to.
-
false
hostPortinteger - Number of port to expose on the host. If specified, this must be a valid port number, 0 < x < 65536. If HostNetwork is specified, this must match ContainerPort. Most containers do not need this.
-
- Format: int32
-
false
namestring - If specified, this must be an IANA_SVC_NAME and unique within the pod. Each named port in a pod must have a unique name. Name for the port that can be referred to by services.
-
false
protocolstring - Protocol for port. Must be UDP, TCP, or SCTP. Defaults to "TCP".
-
- Default: TCP
-
false
- - ### OpenTelemetryCollector.spec.env[index] [↩ Parent](#opentelemetrycollectorspec) diff --git a/pkg/collector/adapters/config_to_ports.go b/pkg/collector/adapters/config_to_ports.go index 89c5269a28..c897bec832 100644 --- a/pkg/collector/adapters/config_to_ports.go +++ b/pkg/collector/adapters/config_to_ports.go @@ -97,69 +97,3 @@ func ConfigToReceiverPorts(logger logr.Logger, config map[interface{}]interface{ return ports, nil } - -// ConfigToContainerPorts converts the incoming configuration object into a set of container ports. -func ConfigToContainerPorts(logger logr.Logger, config map[interface{}]interface{}) ([]corev1.ContainerPort, error) { - // now, we gather which ports we might need to open - // for that, we get all the receivers and check their `endpoint` properties, - // extracting the port from it. The port name has to be a "DNS_LABEL", so, we try to make it follow the pattern: - // ${instance.Name}-${receiver.name}-${receiver.qualifier} - // the receiver-name is typically the node name from the receivers map - // the receiver-qualifier is what comes after the slash in the receiver name, but typically nil - // examples: - // ```yaml - // receivers: - // examplereceiver: - // endpoint: 0.0.0.0:12345 - // examplereceiver/settings: - // endpoint: 0.0.0.0:12346 - // in this case, we have two ports, named: "examplereceiver" and "examplereceiver-settings" - receiversProperty, ok := config["receivers"] - if !ok { - return nil, ErrNoReceivers - } - recEnabled := GetEnabledReceivers(logger, config) - if recEnabled == nil { - return nil, ErrReceiversNotAMap - } - receivers, ok := receiversProperty.(map[interface{}]interface{}) - if !ok { - return nil, ErrReceiversNotAMap - } - - ports := []corev1.ContainerPort{} - for key, val := range receivers { - // This check will pass only the enabled receivers, - // then only the related ports will be opened. - if !recEnabled[key] { - continue - } - receiver, ok := val.(map[interface{}]interface{}) - if !ok { - logger.Info("receiver doesn't seem to be a map of properties", "receiver", key) - receiver = map[interface{}]interface{}{} - } - - rcvrName := key.(string) - rcvrParser := parser.For(logger, rcvrName, receiver) - - rcvrPorts, err := rcvrParser.ContainerPorts() - if err != nil { - // should we break the process and return an error, or just ignore this faulty parser - // and let the other parsers add their ports to the container? right now, the best - // option seems to be to log the failures and move on, instead of failing them all - logger.Error(err, "parser for '%s' has returned an error: %w", rcvrName, err) - continue - } - - if len(rcvrPorts) > 0 { - ports = append(ports, rcvrPorts...) - } - } - - sort.Slice(ports, func(i, j int) bool { - return ports[i].Name < ports[j].Name - }) - - return ports, nil -} diff --git a/pkg/collector/adapters/config_to_ports_test.go b/pkg/collector/adapters/config_to_ports_test.go index 41641fdcc1..fbc6989552 100644 --- a/pkg/collector/adapters/config_to_ports_test.go +++ b/pkg/collector/adapters/config_to_ports_test.go @@ -110,34 +110,6 @@ func TestExtractPortsFromConfig(t *testing.T) { assert.ElementsMatch(t, expectedPorts, ports) } -func TestExtractContainerPortsFromConfig(t *testing.T) { - // prepare - config, err := adapters.ConfigFromString(portConfigStr) - require.NoError(t, err) - require.NotEmpty(t, config) - - // test - ports, err := adapters.ConfigToContainerPorts(logger, config) - assert.NoError(t, err) - assert.Len(t, ports, 11) - - // verify - expectedPorts := []corev1.ContainerPort{ - {Name: "examplereceiver", ContainerPort: 12345}, - {Name: "examplereceiver-settings", ContainerPort: 12346}, - {Name: "jaeger-custom-thrift-http", Protocol: "TCP", ContainerPort: 15268}, - {Name: "jaeger-grpc", Protocol: "TCP", ContainerPort: 14250}, - {Name: "jaeger-thrift-binary", Protocol: "UDP", ContainerPort: 6833}, - {Name: "jaeger-thrift-compact", Protocol: "UDP", ContainerPort: 6831}, - {Name: "otlp-2-grpc", Protocol: "TCP", ContainerPort: 55555}, - {Name: "otlp-grpc", Protocol: "TCP", ContainerPort: 4317}, - {Name: "otlp-http", Protocol: "TCP", ContainerPort: 4318}, - {Name: "otlp-http-legacy", Protocol: "TCP", ContainerPort: 55681}, - {Name: "zipkin", Protocol: "TCP", ContainerPort: 9411}, - } - assert.ElementsMatch(t, expectedPorts, ports) -} - func TestNoPortsParsed(t *testing.T) { for _, tt := range []struct { expected error @@ -162,13 +134,10 @@ func TestNoPortsParsed(t *testing.T) { // test ports, err := adapters.ConfigToReceiverPorts(logger, config) - cPorts, cErr := adapters.ConfigToContainerPorts(logger, config) // verify assert.Nil(t, ports) assert.Equal(t, tt.expected, err) - assert.Nil(t, cPorts) - assert.Equal(t, tt.expected, cErr) }) } } @@ -238,8 +207,7 @@ func TestParserFailed(t *testing.T) { } type mockParser struct { - portsFunc func() ([]corev1.ServicePort, error) - containerPortsFunc func() ([]corev1.ContainerPort, error) + portsFunc func() ([]corev1.ServicePort, error) } func (m *mockParser) Ports() ([]corev1.ServicePort, error) { @@ -250,14 +218,6 @@ func (m *mockParser) Ports() ([]corev1.ServicePort, error) { return nil, nil } -func (m *mockParser) ContainerPorts() ([]corev1.ContainerPort, error) { - if m.containerPortsFunc != nil { - return m.containerPortsFunc() - } - - return nil, nil -} - func (m *mockParser) ParserName() string { return "__mock-adapters" } diff --git a/pkg/collector/container.go b/pkg/collector/container.go index 1344720ca3..32434ee427 100644 --- a/pkg/collector/container.go +++ b/pkg/collector/container.go @@ -32,6 +32,8 @@ import ( "github.com/open-telemetry/opentelemetry-operator/pkg/naming" ) +// maxPortLen allows us to truncate a port name according to what is considered valid port syntax: +// https://pkg.go.dev/k8s.io/apimachinery/pkg/util/validation#IsValidPortName const maxPortLen = 15 var errInvalidPort = errors.New("invalid port name/num") @@ -43,23 +45,14 @@ func Container(cfg config.Config, logger logr.Logger, otelcol v1alpha1.OpenTelem image = cfg.CollectorImage() } - // build port map + // build container ports from service ports ports := getConfigContainerPorts(logger, otelcol.Spec.Config) - for _, p := range otelcol.Spec.ContainerPorts { - truncName := naming.Truncate(p.Name, maxPortLen) - if p.Name != truncName { - logger.Info("truncating container port name", - "port.name.prev", p.Name, "port.name.new", truncName) - p.Name = truncName + for _, p := range otelcol.Spec.Ports { + ports[p.Name] = corev1.ContainerPort{ + Name: p.Name, + ContainerPort: p.Port, + Protocol: p.Protocol, } - nameErrs := validation.IsValidPortName(p.Name) - numErrs := validation.IsValidPortNum(int(p.ContainerPort)) - if len(nameErrs) > 0 || len(numErrs) > 0 { - logger.Error(errInvalidPort, "dropping container port", "port.name", p.Name, "port.num", p.ContainerPort, - "port.name.errs", nameErrs, "num.errs", numErrs) - continue - } - ports[p.Name] = p } argsMap := otelcol.Spec.Args @@ -136,7 +129,7 @@ func getConfigContainerPorts(logger logr.Logger, cfg string) map[string]corev1.C logger.Error(err, "couldn't extract the configuration") return ports } - ps, err := adapters.ConfigToContainerPorts(logger, c) + ps, err := adapters.ConfigToReceiverPorts(logger, c) if err != nil { logger.Error(err, "couldn't build container ports from configuration") } else { @@ -145,16 +138,19 @@ func getConfigContainerPorts(logger logr.Logger, cfg string) map[string]corev1.C if p.Name != truncName { logger.Info("truncating container port name", "port.name.prev", p.Name, "port.name.new", truncName) - p.Name = truncName } - nameErrs := validation.IsValidPortName(p.Name) - numErrs := validation.IsValidPortNum(int(p.ContainerPort)) + nameErrs := validation.IsValidPortName(truncName) + numErrs := validation.IsValidPortNum(int(p.Port)) if len(nameErrs) > 0 || len(numErrs) > 0 { - logger.Error(errInvalidPort, "dropping container port", "port.name", p.Name, "port.num", p.ContainerPort, + logger.Error(errInvalidPort, "dropping container port", "port.name", truncName, "port.num", p.Port, "port.name.errs", nameErrs, "num.errs", numErrs) continue } - ports[p.Name] = p + ports[truncName] = corev1.ContainerPort{ + Name: truncName, + ContainerPort: p.Port, + Protocol: p.Protocol, + } } } @@ -171,6 +167,7 @@ func getConfigContainerPorts(logger logr.Logger, cfg string) map[string]corev1.C return ports } +// getMetricsPort gets the port number for the metrics endpoint from the collector config if it has been set. func getMetricsPort(c map[interface{}]interface{}) (int32, error) { // we don't need to unmarshal the whole config, just follow the keys down to // the metrics address. diff --git a/pkg/collector/container_test.go b/pkg/collector/container_test.go index fdd0ed36cd..4f95a9012e 100644 --- a/pkg/collector/container_test.go +++ b/pkg/collector/container_test.go @@ -78,7 +78,7 @@ service: tests := []struct { description string specConfig string - specPorts []corev1.ContainerPort + specPorts []corev1.ServicePort expectedPorts []corev1.ContainerPort }{ { @@ -107,10 +107,10 @@ service: }, { description: "ports in spec ContainerPorts", - specPorts: []corev1.ContainerPort{ + specPorts: []corev1.ServicePort{ { - Name: "testport1", - ContainerPort: 12345, + Name: "testport1", + Port: 12345, }, }, expectedPorts: []corev1.ContainerPort{ @@ -124,15 +124,15 @@ service: { description: "ports in spec Config and ContainerPorts", specConfig: goodConfig, - specPorts: []corev1.ContainerPort{ + specPorts: []corev1.ServicePort{ { - Name: "testport1", - ContainerPort: 12345, + Name: "testport1", + Port: 12345, }, { - Name: "testport2", - ContainerPort: 54321, - Protocol: corev1.ProtocolUDP, + Name: "testport2", + Port: 54321, + Protocol: corev1.ProtocolUDP, }, }, expectedPorts: []corev1.ContainerPort{ @@ -155,12 +155,12 @@ service: { description: "invalid port name", specConfig: goodConfig, - specPorts: []corev1.ContainerPort{ + specPorts: []corev1.ServicePort{ { // this port name contains a non alphanumeric character, which is invalid. - Name: "-test🦄port", - ContainerPort: 12345, - Protocol: corev1.ProtocolTCP, + Name: "-test🦄port", + Port: 12345, + Protocol: corev1.ProtocolTCP, }, }, expectedPorts: []corev1.ContainerPort{ @@ -174,12 +174,12 @@ service: { description: "long port name", specConfig: goodConfig, - specPorts: []corev1.ContainerPort{ + specPorts: []corev1.ServicePort{ { // this port name is longer than 15 characters, which is invalid. - Name: "testportaaaabbbb", - ContainerPort: 5, - Protocol: corev1.ProtocolTCP, + Name: "testportaaaabbbb", + Port: 5, + Protocol: corev1.ProtocolTCP, }, }, expectedPorts: []corev1.ContainerPort{ @@ -198,14 +198,14 @@ service: { description: "duplicate port name", specConfig: goodConfig, - specPorts: []corev1.ContainerPort{ + specPorts: []corev1.ServicePort{ { - Name: "testport1", - ContainerPort: 12345, + Name: "testport1", + Port: 12345, }, { - Name: "testport1", - ContainerPort: 11111, + Name: "testport1", + Port: 11111, }, }, expectedPorts: []corev1.ContainerPort{ @@ -227,8 +227,8 @@ service: // prepare otelcol := v1alpha1.OpenTelemetryCollector{ Spec: v1alpha1.OpenTelemetryCollectorSpec{ - Config: testCase.specConfig, - ContainerPorts: testCase.specPorts, + Config: testCase.specConfig, + Ports: testCase.specPorts, }, } cfg := config.New(config.WithCollectorImage("default-image")) diff --git a/pkg/collector/parser/receiver.go b/pkg/collector/parser/receiver.go index 6c1fc5c523..ef950c89b7 100644 --- a/pkg/collector/parser/receiver.go +++ b/pkg/collector/parser/receiver.go @@ -37,9 +37,6 @@ type ReceiverParser interface { // Ports returns the service ports parsed based on the receiver's configuration Ports() ([]corev1.ServicePort, error) - // ContainerPorts returns the container ports parsed based on the receiver's configuration. - ContainerPorts() ([]corev1.ContainerPort, error) - // ParserName returns the name of this parser ParserName() string } @@ -136,60 +133,6 @@ func singlePortFromConfigEndpoint(logger logr.Logger, name string, config map[in return nil } -func singleContainerPortFromConfigEndpoint(logger logr.Logger, name string, config map[interface{}]interface{}) *v1.ContainerPort { - var endpoint interface{} - switch { - // syslog receiver contains the endpoint - // that needs to be exposed one level down inside config - // i.e. either in tcp or udp section with field key - // as `listen_address` - case name == "syslog": - var c map[interface{}]interface{} - if udp, isUDP := config["udp"]; isUDP && udp != nil { - c = udp.(map[interface{}]interface{}) - endpoint = getAddressFromConfig(logger, name, listenAddressKey, c) - } else if tcp, isTCP := config["tcp"]; isTCP && tcp != nil { - c = tcp.(map[interface{}]interface{}) - endpoint = getAddressFromConfig(logger, name, listenAddressKey, c) - } - - // tcplog and udplog receivers hold the endpoint - // value in `listen_address` field - case name == "tcplog" || name == "udplog": - endpoint = getAddressFromConfig(logger, name, listenAddressKey, config) - - // ignore kubeletstats receiver as it holds the field key endpoint, and it - // is a scraper, we only expose endpoint through k8s service objects for - // receivers that aren't scrapers. - case name == "kubeletstats": - return nil - - // ignore prometheus receiver as it has no listening endpoint - case name == "prometheus": - return nil - - default: - endpoint = getAddressFromConfig(logger, name, endpointKey, config) - } - - switch endpoint := endpoint.(type) { - case string: - port, err := portFromEndpoint(endpoint) - if err != nil { - logger.WithValues(endpointKey, endpoint).Info("couldn't parse the endpoint's port") - return nil - } - - return &corev1.ContainerPort{ - Name: portName(name, port), - ContainerPort: port, - } - default: - logger.Info("receiver's endpoint isn't a string") - } - - return nil -} func getAddressFromConfig(logger logr.Logger, name, key string, config map[interface{}]interface{}) interface{} { endpoint, ok := config[key] diff --git a/pkg/collector/parser/receiver_generic.go b/pkg/collector/parser/receiver_generic.go index 7751cde05a..21b4e2ce53 100644 --- a/pkg/collector/parser/receiver_generic.go +++ b/pkg/collector/parser/receiver_generic.go @@ -68,25 +68,6 @@ func (g *GenericReceiver) Ports() ([]corev1.ServicePort, error) { return []corev1.ServicePort{}, nil } -// ContainerPorts returns all the container ports for all protocols in this parser. -func (g *GenericReceiver) ContainerPorts() ([]corev1.ContainerPort, error) { - port := singleContainerPortFromConfigEndpoint(g.logger, g.name, g.config) - if port != nil { - port.Protocol = g.defaultProtocol - return []corev1.ContainerPort{*port}, nil - } - - if g.defaultPort > 0 { - return []corev1.ContainerPort{{ - ContainerPort: g.defaultPort, - Name: portName(g.name, g.defaultPort), - Protocol: g.defaultProtocol, - }}, nil - } - - return []corev1.ContainerPort{}, nil -} - // ParserName returns the name of this parser. func (g *GenericReceiver) ParserName() string { return g.parserName diff --git a/pkg/collector/parser/receiver_generic_test.go b/pkg/collector/parser/receiver_generic_test.go index c3e5132403..15fd903591 100644 --- a/pkg/collector/parser/receiver_generic_test.go +++ b/pkg/collector/parser/receiver_generic_test.go @@ -33,25 +33,13 @@ func TestParseEndpoint(t *testing.T) { "endpoint": "0.0.0.0:1234", }) - t.Run("service port parsed", func(t *testing.T) { - // test - ports, err := builder.Ports() - - // verify - assert.NoError(t, err) - assert.Len(t, ports, 1) - assert.EqualValues(t, 1234, ports[0].Port) - }) - - t.Run("container port parsed", func(t *testing.T) { - // test - ports, err := builder.ContainerPorts() + // test + ports, err := builder.Ports() - // verify - assert.NoError(t, err) - assert.Len(t, ports, 1) - assert.EqualValues(t, 1234, ports[0].ContainerPort) - }) + // verify + assert.NoError(t, err) + assert.Len(t, ports, 1) + assert.EqualValues(t, 1234, ports[0].Port) } func TestFailedToParseEndpoint(t *testing.T) { @@ -61,23 +49,12 @@ func TestFailedToParseEndpoint(t *testing.T) { "endpoint": "0.0.0.0", }) - t.Run("no service ports", func(t *testing.T) { - // test - ports, err := builder.Ports() - - // verify - assert.NoError(t, err) - assert.Len(t, ports, 0) - }) - - t.Run("no container ports", func(t *testing.T) { - // test - ports, err := builder.ContainerPorts() + // test + ports, err := builder.Ports() - // verify - assert.NoError(t, err) - assert.Len(t, ports, 0) - }) + // verify + assert.NoError(t, err) + assert.Len(t, ports, 0) } func TestDownstreamParsers(t *testing.T) { @@ -113,7 +90,7 @@ func TestDownstreamParsers(t *testing.T) { assert.Equal(t, tt.parserName, builder.ParserName()) }) - t.Run("assigns the expected service port", func(t *testing.T) { + t.Run("assigns the expected port", func(t *testing.T) { // prepare builder := tt.builder(logger, tt.receiverName, map[interface{}]interface{}{}) @@ -127,21 +104,7 @@ func TestDownstreamParsers(t *testing.T) { assert.Equal(t, tt.receiverName, ports[0].Name) }) - t.Run("assigns the expected container port", func(t *testing.T) { - // prepare - builder := tt.builder(logger, tt.receiverName, map[interface{}]interface{}{}) - - // test - ports, err := builder.ContainerPorts() - - // verify - assert.NoError(t, err) - assert.Len(t, ports, 1) - assert.EqualValues(t, tt.defaultPort, ports[0].ContainerPort) - assert.Equal(t, tt.receiverName, ports[0].Name) - }) - - t.Run("allows service port to be overridden", func(t *testing.T) { + t.Run("allows port to be overridden", func(t *testing.T) { // prepare builder := tt.builder(logger, tt.receiverName, map[interface{}]interface{}{ "endpoint": "0.0.0.0:65535", @@ -156,22 +119,6 @@ func TestDownstreamParsers(t *testing.T) { assert.EqualValues(t, 65535, ports[0].Port) assert.Equal(t, tt.receiverName, ports[0].Name) }) - - t.Run("allows container port to be overridden", func(t *testing.T) { - // prepare - builder := tt.builder(logger, tt.receiverName, map[interface{}]interface{}{ - "endpoint": "0.0.0.0:65535", - }) - - // test - ports, err := builder.ContainerPorts() - - // verify - assert.NoError(t, err) - assert.Len(t, ports, 1) - assert.EqualValues(t, 65535, ports[0].ContainerPort) - assert.Equal(t, tt.receiverName, ports[0].Name) - }) }) } } diff --git a/pkg/collector/parser/receiver_jaeger.go b/pkg/collector/parser/receiver_jaeger.go index 14286b2d11..428938c313 100644 --- a/pkg/collector/parser/receiver_jaeger.go +++ b/pkg/collector/parser/receiver_jaeger.go @@ -125,71 +125,6 @@ func (j *JaegerReceiverParser) Ports() ([]corev1.ServicePort, error) { return ports, nil } -// ContainerPorts returns all the container ports for all protocols in this parser. -func (j *JaegerReceiverParser) ContainerPorts() ([]corev1.ContainerPort, error) { - ports := []corev1.ContainerPort{} - - for _, protocol := range []struct { - name string - transportProtocol corev1.Protocol - appProtocol string - defaultPort int32 - }{ - { - name: "grpc", - defaultPort: defaultGRPCPort, - transportProtocol: corev1.ProtocolTCP, - appProtocol: "grpc", - }, - { - name: "thrift_http", - defaultPort: defaultThriftHTTPPort, - transportProtocol: corev1.ProtocolTCP, - appProtocol: "http", - }, - { - name: "thrift_compact", - defaultPort: defaultThriftCompactPort, - transportProtocol: corev1.ProtocolUDP, - }, - { - name: "thrift_binary", - defaultPort: defaultThriftBinaryPort, - transportProtocol: corev1.ProtocolUDP, - }, - } { - // do we have the protocol specified at all? - if receiverProtocol, ok := j.config[protocol.name]; ok { - // we have the specified protocol, we definitely need a container port - nameWithProtocol := fmt.Sprintf("%s-%s", j.name, protocol.name) - var protocolPort *corev1.ContainerPort - - // do we have a configuration block for the protocol? - settings, ok := receiverProtocol.(map[interface{}]interface{}) - if ok { - protocolPort = singleContainerPortFromConfigEndpoint(j.logger, nameWithProtocol, settings) - } - - // have we parsed a port based on the configuration block? - // if not, we use the default port - if protocolPort == nil { - protocolPort = &corev1.ContainerPort{ - Name: portName(nameWithProtocol, protocol.defaultPort), - ContainerPort: protocol.defaultPort, - } - } - - // set the appropriate transport protocol (i.e. TCP/UDP) for this kind of receiver protocol - protocolPort.Protocol = protocol.transportProtocol - - // at this point, we *have* a port specified, add it to the list of ports - ports = append(ports, *protocolPort) - } - } - - return ports, nil -} - // ParserName returns the name of this parser. func (j *JaegerReceiverParser) ParserName() string { return parserNameJaeger diff --git a/pkg/collector/parser/receiver_jaeger_test.go b/pkg/collector/parser/receiver_jaeger_test.go index 7af96ba174..827c988c4a 100644 --- a/pkg/collector/parser/receiver_jaeger_test.go +++ b/pkg/collector/parser/receiver_jaeger_test.go @@ -42,27 +42,14 @@ func TestJaegerMinimalConfiguration(t *testing.T) { }, }) - t.Run("service port exists", func(t *testing.T) { - // test - ports, err := builder.Ports() - - // verify - assert.NoError(t, err) - assert.Len(t, ports, 1) - assert.EqualValues(t, 14250, ports[0].Port) - assert.EqualValues(t, corev1.ProtocolTCP, ports[0].Protocol) - }) - - t.Run("container port exists", func(t *testing.T) { - // test - ports, err := builder.ContainerPorts() + // test + ports, err := builder.Ports() - // verify - assert.NoError(t, err) - assert.Len(t, ports, 1) - assert.EqualValues(t, 14250, ports[0].ContainerPort) - assert.EqualValues(t, corev1.ProtocolTCP, ports[0].Protocol) - }) + // verify + assert.NoError(t, err) + assert.Len(t, ports, 1) + assert.EqualValues(t, 14250, ports[0].Port) + assert.EqualValues(t, corev1.ProtocolTCP, ports[0].Protocol) } func TestJaegerPortsOverridden(t *testing.T) { @@ -75,27 +62,14 @@ func TestJaegerPortsOverridden(t *testing.T) { }, }) - t.Run("service ports overridden", func(t *testing.T) { - // test - ports, err := builder.Ports() - - // verify - assert.NoError(t, err) - assert.Len(t, ports, 1) - assert.EqualValues(t, 1234, ports[0].Port) - assert.EqualValues(t, corev1.ProtocolTCP, ports[0].Protocol) - }) - - t.Run("container ports overridden", func(t *testing.T) { - // test - ports, err := builder.ContainerPorts() + // test + ports, err := builder.Ports() - // verify - assert.NoError(t, err) - assert.Len(t, ports, 1) - assert.EqualValues(t, 1234, ports[0].ContainerPort) - assert.EqualValues(t, corev1.ProtocolTCP, ports[0].Protocol) - }) + // verify + assert.NoError(t, err) + assert.Len(t, ports, 1) + assert.EqualValues(t, 1234, ports[0].Port) + assert.EqualValues(t, corev1.ProtocolTCP, ports[0].Protocol) } func TestJaegerExposeDefaultPorts(t *testing.T) { @@ -112,6 +86,7 @@ func TestJaegerExposeDefaultPorts(t *testing.T) { expectedResults := map[string]struct { transportProtocol corev1.Protocol portNumber int32 + seen bool }{ "jaeger-grpc": {portNumber: 14250, transportProtocol: corev1.ProtocolTCP}, "jaeger-thrift-http": {portNumber: 14268, transportProtocol: corev1.ProtocolTCP}, @@ -119,45 +94,21 @@ func TestJaegerExposeDefaultPorts(t *testing.T) { "jaeger-thrift-binary": {portNumber: 6832, transportProtocol: corev1.ProtocolUDP}, } - t.Run("service ports exposed", func(t *testing.T) { - // test - ports, err := builder.Ports() - - // verify - assert.NoError(t, err) - assert.Len(t, ports, 4) - - seen := map[string]bool{} - for _, port := range ports { - r, ok := expectedResults[port.Name] - seen[port.Name] = true - assert.True(t, ok, "unexpected service port %s", port.Name) - assert.EqualValues(t, r.portNumber, port.Port) - assert.EqualValues(t, r.transportProtocol, port.Protocol) - } - for k := range expectedResults { - assert.True(t, seen[k], "the port %s wasn't included in the service ports", k) - } - }) + // test + ports, err := builder.Ports() - t.Run("container ports exposed", func(t *testing.T) { - // test - ports, err := builder.ContainerPorts() - - // verify - assert.NoError(t, err) - assert.Len(t, ports, 4) - - seen := map[string]bool{} - for _, port := range ports { - r, ok := expectedResults[port.Name] - seen[port.Name] = true - assert.True(t, ok, "unexpected container port %s", port.Name) - assert.EqualValues(t, r.portNumber, port.ContainerPort) - assert.EqualValues(t, r.transportProtocol, port.Protocol) - } - for k := range expectedResults { - assert.True(t, seen[k], "the port %s wasn't included in the container ports", k) - } - }) + // verify + assert.NoError(t, err) + assert.Len(t, ports, 4) + + for _, port := range ports { + r := expectedResults[port.Name] + r.seen = true + expectedResults[port.Name] = r + assert.EqualValues(t, r.portNumber, port.Port) + assert.EqualValues(t, r.transportProtocol, port.Protocol) + } + for k, v := range expectedResults { + assert.True(t, v.seen, "the port %s wasn't included in the service ports", k) + } } diff --git a/pkg/collector/parser/receiver_otlp.go b/pkg/collector/parser/receiver_otlp.go index 7fa00223c3..2fa208fe94 100644 --- a/pkg/collector/parser/receiver_otlp.go +++ b/pkg/collector/parser/receiver_otlp.go @@ -130,66 +130,6 @@ func (o *OTLPReceiverParser) Ports() ([]corev1.ServicePort, error) { return ports, nil } -// ContainerPorts returns all the container ports for all protocols in this parser. -func (o *OTLPReceiverParser) ContainerPorts() ([]corev1.ContainerPort, error) { - ports := []corev1.ContainerPort{} - - for _, protocol := range []struct { - name string - defaultPorts []corev1.ContainerPort - }{ - { - name: grpc, - defaultPorts: []corev1.ContainerPort{ - { - Name: portName(fmt.Sprintf("%s-grpc", o.name), defaultOTLPGRPCPort), - ContainerPort: defaultOTLPGRPCPort, - Protocol: corev1.ProtocolTCP, - }, - }, - }, - { - name: http, - defaultPorts: []corev1.ContainerPort{ - { - Name: portName(fmt.Sprintf("%s-http", o.name), defaultOTLPHTTPPort), - ContainerPort: defaultOTLPHTTPPort, - Protocol: corev1.ProtocolTCP, - }, - { - Name: portName(fmt.Sprintf("%s-http-legacy", o.name), defaultOTLPHTTPLegacyPort), - ContainerPort: defaultOTLPHTTPLegacyPort, - Protocol: corev1.ProtocolTCP, - }, - }, - }, - } { - // do we have the protocol specified at all? - if receiverProtocol, ok := o.config[protocol.name]; ok { - // we have the specified protocol, we definitely need a container port - nameWithProtocol := fmt.Sprintf("%s-%s", o.name, protocol.name) - var protocolPort *corev1.ContainerPort - - // do we have a configuration block for the protocol? - settings, ok := receiverProtocol.(map[interface{}]interface{}) - if ok { - protocolPort = singleContainerPortFromConfigEndpoint(o.logger, nameWithProtocol, settings) - } - - // have we parsed a port based on the configuration block? - // if not, we use the default port - if protocolPort == nil { - ports = append(ports, protocol.defaultPorts...) - } else { - protocolPort.Protocol = corev1.ProtocolTCP - ports = append(ports, *protocolPort) - } - } - } - - return ports, nil -} - // ParserName returns the name of this parser. func (o *OTLPReceiverParser) ParserName() string { return parserNameOTLP diff --git a/pkg/collector/parser/receiver_otlp_test.go b/pkg/collector/parser/receiver_otlp_test.go index da98ed88df..6abca3b5d1 100644 --- a/pkg/collector/parser/receiver_otlp_test.go +++ b/pkg/collector/parser/receiver_otlp_test.go @@ -48,50 +48,28 @@ func TestOTLPPortsOverridden(t *testing.T) { expectedResults := map[string]struct { portNumber int32 + seen bool }{ "otlp-grpc": {portNumber: 1234}, "otlp-http": {portNumber: 1235}, } - t.Run("service ports overridden", func(t *testing.T) { - // test - ports, err := builder.Ports() - - // verify - assert.NoError(t, err) - assert.Len(t, ports, len(expectedResults)) - - seen := map[string]bool{} - for _, port := range ports { - r, ok := expectedResults[port.Name] - seen[port.Name] = true - assert.True(t, ok, "unexpected service port %s", port.Name) - assert.EqualValues(t, r.portNumber, port.Port) - } - for k := range expectedResults { - assert.True(t, seen[k], "the port %s wasn't included in the service ports", k) - } - }) + // test + ports, err := builder.Ports() - t.Run("container ports overridden", func(t *testing.T) { - // test - ports, err := builder.ContainerPorts() - - // verify - assert.NoError(t, err) - assert.Len(t, ports, len(expectedResults)) - - seen := map[string]bool{} - for _, port := range ports { - r, ok := expectedResults[port.Name] - seen[port.Name] = true - assert.True(t, ok, "unexpected container port %s", port.Name) - assert.EqualValues(t, r.portNumber, port.ContainerPort) - } - for k := range expectedResults { - assert.True(t, seen[k], "the port %s wasn't included in the container ports", k) - } - }) + // verify + assert.NoError(t, err) + assert.Len(t, ports, len(expectedResults)) + + for _, port := range ports { + r := expectedResults[port.Name] + r.seen = true + expectedResults[port.Name] = r + assert.EqualValues(t, r.portNumber, port.Port) + } + for k, v := range expectedResults { + assert.True(t, v.seen, "the port %s wasn't included in the service ports", k) + } } func TestOTLPExposeDefaultPorts(t *testing.T) { @@ -105,49 +83,27 @@ func TestOTLPExposeDefaultPorts(t *testing.T) { expectedResults := map[string]struct { portNumber int32 + seen bool }{ "otlp-grpc": {portNumber: 4317}, "otlp-http": {portNumber: 4318}, "otlp-http-legacy": {portNumber: 55681}, } - t.Run("service ports exposed", func(t *testing.T) { - // test - ports, err := builder.Ports() - - // verify - assert.NoError(t, err) - assert.Len(t, ports, len(expectedResults)) - - seen := map[string]bool{} - for _, port := range ports { - r, ok := expectedResults[port.Name] - seen[port.Name] = true - assert.True(t, ok, "unexpected service port %s", port.Name) - assert.EqualValues(t, r.portNumber, port.Port) - } - for k := range expectedResults { - assert.True(t, seen[k], "the port %s wasn't included in the service ports", k) - } - }) + // test + ports, err := builder.Ports() - t.Run("container ports exposed", func(t *testing.T) { - // test - ports, err := builder.ContainerPorts() - - // verify - assert.NoError(t, err) - assert.Len(t, ports, len(expectedResults)) - - seen := map[string]bool{} - for _, port := range ports { - r, ok := expectedResults[port.Name] - seen[port.Name] = true - assert.True(t, ok, "unexpected container port %s", port.Name) - assert.EqualValues(t, r.portNumber, port.ContainerPort) - } - for k := range expectedResults { - assert.True(t, seen[k], "the port %s wasn't included in the container ports", k) - } - }) + // verify + assert.NoError(t, err) + assert.Len(t, ports, len(expectedResults)) + + for _, port := range ports { + r := expectedResults[port.Name] + r.seen = true + expectedResults[port.Name] = r + assert.EqualValues(t, r.portNumber, port.Port) + } + for k, v := range expectedResults { + assert.True(t, v.seen, "the port %s wasn't included in the service ports", k) + } } diff --git a/pkg/collector/parser/receiver_test.go b/pkg/collector/parser/receiver_test.go index 5acb361631..2566f7a7be 100644 --- a/pkg/collector/parser/receiver_test.go +++ b/pkg/collector/parser/receiver_test.go @@ -92,21 +92,11 @@ func TestReceiverFailsWhenPortIsntString(t *testing.T) { "endpoint": 123, } - t.Run("service port fails", func(t *testing.T) { - // test - p := singlePortFromConfigEndpoint(logger, "myreceiver", config) - - // verify - assert.Nil(t, p) - }) - - t.Run("container port fails", func(t *testing.T) { - // test - p := singleContainerPortFromConfigEndpoint(logger, "myreceiver", config) + // test + p := singlePortFromConfigEndpoint(logger, "myreceiver", config) - // verify - assert.Nil(t, p) - }) + // verify + assert.Nil(t, p) } func TestIgnorekubeletstatsEndpoint(t *testing.T) { @@ -116,23 +106,12 @@ func TestIgnorekubeletstatsEndpoint(t *testing.T) { "endpoint": "0.0.0.0:9000", }) - t.Run("no service ports", func(t *testing.T) { - // test - ports, err := builder.Ports() - - // verify - assert.NoError(t, err) - assert.Len(t, ports, 0) - }) - - t.Run("no container ports", func(t *testing.T) { - // test - ports, err := builder.ContainerPorts() + // test + ports, err := builder.Ports() - // verify - assert.NoError(t, err) - assert.Len(t, ports, 0) - }) + // verify + assert.NoError(t, err) + assert.Len(t, ports, 0) } func TestReceiverFallbackWhenNotRegistered(t *testing.T) { @@ -165,10 +144,6 @@ func (m *mockParser) Ports() ([]corev1.ServicePort, error) { return nil, nil } -func (m *mockParser) ContainerPorts() ([]corev1.ContainerPort, error) { - return nil, nil -} - func (m *mockParser) ParserName() string { return "__mock" } From a76c99f5302c568a38389ecf59807fb4fd17e8f6 Mon Sep 17 00:00:00 2001 From: Kristina Pathak Date: Mon, 26 Sep 2022 19:51:27 -0700 Subject: [PATCH 3/4] add, fix tests --- .../opentelemetrycollector_webhook.go | 10 +- .../opentelemetrycollector_webhook_test.go | 254 ++++++++++++++++++ pkg/collector/container_test.go | 43 --- 3 files changed, 260 insertions(+), 47 deletions(-) diff --git a/apis/v1alpha1/opentelemetrycollector_webhook.go b/apis/v1alpha1/opentelemetrycollector_webhook.go index 12b6d98904..b673fd9cf0 100644 --- a/apis/v1alpha1/opentelemetrycollector_webhook.go +++ b/apis/v1alpha1/opentelemetrycollector_webhook.go @@ -16,9 +16,9 @@ package v1alpha1 import ( "fmt" - "k8s.io/apimachinery/pkg/util/validation" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/validation" ctrl "sigs.k8s.io/controller-runtime" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/webhook" @@ -133,7 +133,7 @@ func (r *OpenTelemetryCollector) validateCRDSpec() error { // validate autoscale with horizontal pod autoscaler if r.Spec.MaxReplicas != nil { - if *r.Spec.MaxReplicas < int32(1) { + if *r.Spec.MaxReplicas <= int32(1) { return fmt.Errorf("the OpenTelemetry Spec autoscale configuration is incorrect, maxReplicas should be defined and more than one") } @@ -150,11 +150,13 @@ func (r *OpenTelemetryCollector) validateCRDSpec() error { } if r.Spec.Autoscaler != nil && r.Spec.Autoscaler.Behavior != nil { - if r.Spec.Autoscaler.Behavior.ScaleDown != nil && *r.Spec.Autoscaler.Behavior.ScaleDown.StabilizationWindowSeconds < int32(1) { + if r.Spec.Autoscaler.Behavior.ScaleDown != nil && r.Spec.Autoscaler.Behavior.ScaleDown.StabilizationWindowSeconds != nil && + *r.Spec.Autoscaler.Behavior.ScaleDown.StabilizationWindowSeconds < int32(1) { return fmt.Errorf("the OpenTelemetry Spec autoscale configuration is incorrect, scaleDown should be one or more") } - if r.Spec.Autoscaler.Behavior.ScaleUp != nil && *r.Spec.Autoscaler.Behavior.ScaleUp.StabilizationWindowSeconds < int32(1) { + if r.Spec.Autoscaler.Behavior.ScaleUp != nil && r.Spec.Autoscaler.Behavior.ScaleUp.StabilizationWindowSeconds != nil && + *r.Spec.Autoscaler.Behavior.ScaleUp.StabilizationWindowSeconds < int32(1) { return fmt.Errorf("the OpenTelemetry Spec autoscale configuration is incorrect, scaleUp should be one or more") } } diff --git a/apis/v1alpha1/opentelemetrycollector_webhook_test.go b/apis/v1alpha1/opentelemetrycollector_webhook_test.go index 39dc709551..3921dc0b78 100644 --- a/apis/v1alpha1/opentelemetrycollector_webhook_test.go +++ b/apis/v1alpha1/opentelemetrycollector_webhook_test.go @@ -18,6 +18,8 @@ import ( "testing" "github.com/stretchr/testify/assert" + autoscalingv2 "k8s.io/api/autoscaling/v2" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -76,3 +78,255 @@ func TestOTELColDefaultingWebhook(t *testing.T) { }) } } + +func TestOTELColValidatingWebhook(t *testing.T) { + zero := int32(0) + one := int32(1) + three := int32(3) + five := int32(5) + + tests := []struct { //nolint:govet + name string + otelcol OpenTelemetryCollector + expectedErr string + }{ + { + name: "valid empty spec", + otelcol: OpenTelemetryCollector{}, + }, + { + name: "valid full spec", + otelcol: OpenTelemetryCollector{ + Spec: OpenTelemetryCollectorSpec{ + Mode: ModeStatefulSet, + MinReplicas: &one, + Replicas: &three, + MaxReplicas: &five, + UpgradeStrategy: "adhoc", + TargetAllocator: OpenTelemetryTargetAllocator{ + Enabled: true, + }, + Config: `receivers: + examplereceiver: + endpoint: "0.0.0.0:12345" + examplereceiver/settings: + endpoint: "0.0.0.0:12346" + prometheus: + config: + scrape_config: + job_name: otel-collector + scrape_interval: 10s + jaeger/custom: + protocols: + thrift_http: + endpoint: 0.0.0.0:15268 +`, + Ports: []v1.ServicePort{ + { + Name: "port1", + Port: 5555, + }, + { + Name: "port2", + Port: 5554, + Protocol: v1.ProtocolUDP, + }, + }, + Autoscaler: &AutoscalerSpec{ + Behavior: &autoscalingv2.HorizontalPodAutoscalerBehavior{ + ScaleDown: &autoscalingv2.HPAScalingRules{ + StabilizationWindowSeconds: &three, + }, + ScaleUp: &autoscalingv2.HPAScalingRules{ + StabilizationWindowSeconds: &five, + }, + }, + TargetCPUUtilization: &five, + }, + }, + }, + }, + { + name: "invalid mode with volume claim templates", + otelcol: OpenTelemetryCollector{ + Spec: OpenTelemetryCollectorSpec{ + Mode: ModeSidecar, + VolumeClaimTemplates: []v1.PersistentVolumeClaim{{}, {}}, + }, + }, + expectedErr: "does not support the attribute 'volumeClaimTemplates'", + }, + { + name: "invalid mode with tolerations", + otelcol: OpenTelemetryCollector{ + Spec: OpenTelemetryCollectorSpec{ + Mode: ModeSidecar, + Tolerations: []v1.Toleration{{}, {}}, + }, + }, + expectedErr: "does not support the attribute 'tolerations'", + }, + { + name: "invalid mode with target allocator", + otelcol: OpenTelemetryCollector{ + Spec: OpenTelemetryCollectorSpec{ + Mode: ModeDeployment, + TargetAllocator: OpenTelemetryTargetAllocator{ + Enabled: true, + }, + }, + }, + expectedErr: "does not support the target allocation deployment", + }, + { + name: "invalid target allocator config", + otelcol: OpenTelemetryCollector{ + Spec: OpenTelemetryCollectorSpec{ + Mode: ModeStatefulSet, + TargetAllocator: OpenTelemetryTargetAllocator{ + Enabled: true, + }, + }, + }, + expectedErr: "the OpenTelemetry Spec Prometheus configuration is incorrect", + }, + { + name: "invalid port name", + otelcol: OpenTelemetryCollector{ + Spec: OpenTelemetryCollectorSpec{ + Ports: []v1.ServicePort{ + { + // this port name contains a non alphanumeric character, which is invalid. + Name: "-test🦄port", + Port: 12345, + Protocol: v1.ProtocolTCP, + }, + }, + }, + }, + expectedErr: "the OpenTelemetry Spec Ports configuration is incorrect", + }, + { + name: "invalid port name, too long", + otelcol: OpenTelemetryCollector{ + Spec: OpenTelemetryCollectorSpec{ + Ports: []v1.ServicePort{ + { + Name: "aaaabbbbccccdddd", // len: 16, too long + Port: 5555, + }, + }, + }, + }, + expectedErr: "the OpenTelemetry Spec Ports configuration is incorrect", + }, + { + name: "invalid port num", + otelcol: OpenTelemetryCollector{ + Spec: OpenTelemetryCollectorSpec{ + Ports: []v1.ServicePort{ + { + Name: "aaaabbbbccccddd", // len: 15 + // no port set means it's 0, which is invalid + }, + }, + }, + }, + expectedErr: "the OpenTelemetry Spec Ports configuration is incorrect", + }, + { + name: "invalid max replicas", + otelcol: OpenTelemetryCollector{ + Spec: OpenTelemetryCollectorSpec{ + MaxReplicas: &one, + }, + }, + expectedErr: "maxReplicas should be defined and more than one", + }, + { + name: "invalid replicas, greater than max", + otelcol: OpenTelemetryCollector{ + Spec: OpenTelemetryCollectorSpec{ + MaxReplicas: &three, + Replicas: &five, + }, + }, + expectedErr: "replicas must not be greater than maxReplicas", + }, + { + name: "invalid min replicas, greater than max", + otelcol: OpenTelemetryCollector{ + Spec: OpenTelemetryCollectorSpec{ + MaxReplicas: &three, + MinReplicas: &five, + }, + }, + expectedErr: "minReplicas must not be greater than maxReplicas", + }, + { + name: "invalid min replicas, lesser than 1", + otelcol: OpenTelemetryCollector{ + Spec: OpenTelemetryCollectorSpec{ + MaxReplicas: &three, + MinReplicas: &zero, + }, + }, + expectedErr: "minReplicas should be one or more", + }, + { + name: "invalid autoscaler scale down", + otelcol: OpenTelemetryCollector{ + Spec: OpenTelemetryCollectorSpec{ + MaxReplicas: &three, + Autoscaler: &AutoscalerSpec{ + Behavior: &autoscalingv2.HorizontalPodAutoscalerBehavior{ + ScaleDown: &autoscalingv2.HPAScalingRules{ + StabilizationWindowSeconds: &zero, + }, + }, + }, + }, + }, + expectedErr: "scaleDown should be one or more", + }, + { + name: "invalid autoscaler scale up", + otelcol: OpenTelemetryCollector{ + Spec: OpenTelemetryCollectorSpec{ + MaxReplicas: &three, + Autoscaler: &AutoscalerSpec{ + Behavior: &autoscalingv2.HorizontalPodAutoscalerBehavior{ + ScaleUp: &autoscalingv2.HPAScalingRules{ + StabilizationWindowSeconds: &zero, + }, + }, + }, + }, + }, + expectedErr: "scaleUp should be one or more", + }, + { + name: "invalid autoscaler target cpu utilization", + otelcol: OpenTelemetryCollector{ + Spec: OpenTelemetryCollectorSpec{ + MaxReplicas: &three, + Autoscaler: &AutoscalerSpec{ + TargetCPUUtilization: &zero, + }, + }, + }, + expectedErr: "targetCPUUtilization should be greater than 0 and less than 100", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + err := test.otelcol.validateCRDSpec() + if test.expectedErr == "" { + assert.NoError(t, err) + return + } + assert.ErrorContains(t, err, test.expectedErr) + }) + } +} diff --git a/pkg/collector/container_test.go b/pkg/collector/container_test.go index 4f95a9012e..a0d7155e59 100644 --- a/pkg/collector/container_test.go +++ b/pkg/collector/container_test.go @@ -152,49 +152,6 @@ service: }, }, }, - { - description: "invalid port name", - specConfig: goodConfig, - specPorts: []corev1.ServicePort{ - { - // this port name contains a non alphanumeric character, which is invalid. - Name: "-test🦄port", - Port: 12345, - Protocol: corev1.ProtocolTCP, - }, - }, - expectedPorts: []corev1.ContainerPort{ - { - Name: "examplereceiver", - ContainerPort: 12345, - }, - metricContainerPort, - }, - }, - { - description: "long port name", - specConfig: goodConfig, - specPorts: []corev1.ServicePort{ - { - // this port name is longer than 15 characters, which is invalid. - Name: "testportaaaabbbb", - Port: 5, - Protocol: corev1.ProtocolTCP, - }, - }, - expectedPorts: []corev1.ContainerPort{ - { - Name: "examplereceiver", - ContainerPort: 12345, - }, - metricContainerPort, - { - Name: "testportaaaabbb", - ContainerPort: 5, - Protocol: corev1.ProtocolTCP, - }, - }, - }, { description: "duplicate port name", specConfig: goodConfig, From 322ee43803d564cd058c3ba5bbba90a3f6058d26 Mon Sep 17 00:00:00 2001 From: Kristina Pathak Date: Mon, 3 Oct 2022 10:53:39 -0700 Subject: [PATCH 4/4] move logic changes to their own branch --- apis/v1alpha1/opentelemetrycollector_webhook.go | 8 +++----- apis/v1alpha1/opentelemetrycollector_webhook_test.go | 2 +- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/apis/v1alpha1/opentelemetrycollector_webhook.go b/apis/v1alpha1/opentelemetrycollector_webhook.go index b673fd9cf0..2c09add4ce 100644 --- a/apis/v1alpha1/opentelemetrycollector_webhook.go +++ b/apis/v1alpha1/opentelemetrycollector_webhook.go @@ -133,7 +133,7 @@ func (r *OpenTelemetryCollector) validateCRDSpec() error { // validate autoscale with horizontal pod autoscaler if r.Spec.MaxReplicas != nil { - if *r.Spec.MaxReplicas <= int32(1) { + if *r.Spec.MaxReplicas < int32(1) { return fmt.Errorf("the OpenTelemetry Spec autoscale configuration is incorrect, maxReplicas should be defined and more than one") } @@ -150,13 +150,11 @@ func (r *OpenTelemetryCollector) validateCRDSpec() error { } if r.Spec.Autoscaler != nil && r.Spec.Autoscaler.Behavior != nil { - if r.Spec.Autoscaler.Behavior.ScaleDown != nil && r.Spec.Autoscaler.Behavior.ScaleDown.StabilizationWindowSeconds != nil && - *r.Spec.Autoscaler.Behavior.ScaleDown.StabilizationWindowSeconds < int32(1) { + if r.Spec.Autoscaler.Behavior.ScaleDown != nil && *r.Spec.Autoscaler.Behavior.ScaleDown.StabilizationWindowSeconds < int32(1) { return fmt.Errorf("the OpenTelemetry Spec autoscale configuration is incorrect, scaleDown should be one or more") } - if r.Spec.Autoscaler.Behavior.ScaleUp != nil && r.Spec.Autoscaler.Behavior.ScaleUp.StabilizationWindowSeconds != nil && - *r.Spec.Autoscaler.Behavior.ScaleUp.StabilizationWindowSeconds < int32(1) { + if r.Spec.Autoscaler.Behavior.ScaleUp != nil && *r.Spec.Autoscaler.Behavior.ScaleUp.StabilizationWindowSeconds < int32(1) { return fmt.Errorf("the OpenTelemetry Spec autoscale configuration is incorrect, scaleUp should be one or more") } } diff --git a/apis/v1alpha1/opentelemetrycollector_webhook_test.go b/apis/v1alpha1/opentelemetrycollector_webhook_test.go index 3921dc0b78..7396b335ee 100644 --- a/apis/v1alpha1/opentelemetrycollector_webhook_test.go +++ b/apis/v1alpha1/opentelemetrycollector_webhook_test.go @@ -238,7 +238,7 @@ func TestOTELColValidatingWebhook(t *testing.T) { name: "invalid max replicas", otelcol: OpenTelemetryCollector{ Spec: OpenTelemetryCollectorSpec{ - MaxReplicas: &one, + MaxReplicas: &zero, }, }, expectedErr: "maxReplicas should be defined and more than one",