From 7625a21f11626995b0588a6e55330f7b4a80996a Mon Sep 17 00:00:00 2001 From: Ruben Vargas Date: Sun, 12 Jun 2022 23:07:49 -0500 Subject: [PATCH] expose OTLP collector and allInOne ports Signed-off-by: Ruben Vargas --- pkg/deployment/all_in_one.go | 138 +++++++++++++++--------------- pkg/deployment/all_in_one_test.go | 4 + pkg/deployment/args.go | 13 --- pkg/deployment/args_test.go | 26 ------ pkg/deployment/collector.go | 76 ++++++++-------- pkg/deployment/collector_test.go | 16 ++++ pkg/deployment/ingester.go | 2 +- pkg/deployment/otlp.go | 37 ++++++++ pkg/deployment/query.go | 2 +- pkg/service/collector.go | 61 +++++++++---- pkg/util/util.go | 32 +++++++ pkg/util/util_test.go | 120 ++++++++++++++++++++++++++ 12 files changed, 364 insertions(+), 163 deletions(-) delete mode 100644 pkg/deployment/args.go delete mode 100644 pkg/deployment/args_test.go create mode 100644 pkg/deployment/otlp.go diff --git a/pkg/deployment/all_in_one.go b/pkg/deployment/all_in_one.go index 7771912eb..4d7e753ef 100644 --- a/pkg/deployment/all_in_one.go +++ b/pkg/deployment/all_in_one.go @@ -63,7 +63,7 @@ func (a *AllInOne) Get() *appsv1.Deployment { commonSpec.Annotations["sidecar.istio.io/inject"] = "false" } - options := allArgs(a.jaeger.Spec.AllInOne.Options, + options := util.AllArgs(a.jaeger.Spec.AllInOne.Options, a.jaeger.Spec.Storage.Options.Filter(a.jaeger.Spec.Storage.Type.OptionsPrefix())) configmap.Update(a.jaeger, commonSpec, &options) @@ -113,6 +113,71 @@ func (a *AllInOne) Get() *appsv1.Deployment { strategy = *a.jaeger.Spec.AllInOne.Strategy } + envVars := []corev1.EnvVar{ + { + Name: "SPAN_STORAGE_TYPE", + Value: string(a.jaeger.Spec.Storage.Type), + }, + { + Name: "METRICS_STORAGE_TYPE", + Value: string(a.jaeger.Spec.AllInOne.MetricsStorage.Type), + }, + { + Name: "COLLECTOR_ZIPKIN_HOST_PORT", + Value: ":9411", + }, + { + Name: "JAEGER_DISABLED", + Value: strconv.FormatBool(jaegerDisabled), + }, + } + + ports := []corev1.ContainerPort{ + { + ContainerPort: 5775, + Name: "zk-compact-trft", // max 15 chars! + Protocol: corev1.ProtocolUDP, + }, + { + ContainerPort: 5778, + Name: "config-rest", + }, + { + ContainerPort: 6831, + Name: "jg-compact-trft", + Protocol: corev1.ProtocolUDP, + }, + { + ContainerPort: 6832, + Name: "jg-binary-trft", + Protocol: corev1.ProtocolUDP, + }, + { + ContainerPort: 9411, + Name: "zipkin", + }, + { + ContainerPort: 14267, + Name: "c-tchan-trft", // for collector + }, + { + ContainerPort: 14268, + Name: "c-binary-trft", + }, + { + ContainerPort: 16686, + Name: "query", + }, + { + ContainerPort: adminPort, + Name: "admin-http", + }, + { + ContainerPort: 14250, + Name: "grpc", + }, + } + return &appsv1.Deployment{ TypeMeta: metav1.TypeMeta{ APIVersion: "apps/v1", @@ -144,74 +209,13 @@ func (a *AllInOne) Get() *appsv1.Deployment { Spec: corev1.PodSpec{ ImagePullSecrets: commonSpec.ImagePullSecrets, Containers: []corev1.Container{{ - Image: util.ImageName(a.jaeger.Spec.AllInOne.Image, "jaeger-all-in-one-image"), - Name: "jaeger", - Args: options, - Env: []corev1.EnvVar{ - { - Name: "SPAN_STORAGE_TYPE", - Value: string(a.jaeger.Spec.Storage.Type), - }, - { - Name: "METRICS_STORAGE_TYPE", - Value: string(a.jaeger.Spec.AllInOne.MetricsStorage.Type), - }, - { - Name: "COLLECTOR_ZIPKIN_HOST_PORT", - Value: ":9411", - }, - { - Name: "JAEGER_DISABLED", - Value: strconv.FormatBool(jaegerDisabled), - }, - }, + Image: util.ImageName(a.jaeger.Spec.AllInOne.Image, "jaeger-all-in-one-image"), + Name: "jaeger", + Args: options, + Env: append(envVars, getOTLPEnvVars(options)...), VolumeMounts: commonSpec.VolumeMounts, EnvFrom: envFromSource, - Ports: []corev1.ContainerPort{ - { - ContainerPort: 5775, - Name: "zk-compact-trft", // max 15 chars! - Protocol: corev1.ProtocolUDP, - }, - { - ContainerPort: 5778, - Name: "config-rest", - }, - { - ContainerPort: 6831, - Name: "jg-compact-trft", - Protocol: corev1.ProtocolUDP, - }, - { - ContainerPort: 6832, - Name: "jg-binary-trft", - Protocol: corev1.ProtocolUDP, - }, - { - ContainerPort: 9411, - Name: "zipkin", - }, - { - ContainerPort: 14267, - Name: "c-tchan-trft", // for collector - }, - { - ContainerPort: 14268, - Name: "c-binary-trft", - }, - { - ContainerPort: 16686, - Name: "query", - }, - { - ContainerPort: adminPort, - Name: "admin-http", - }, - { - ContainerPort: 14250, - Name: "grpc", - }, - }, + Ports: append(ports, getOTLPContainePorts(options)...), LivenessProbe: &corev1.Probe{ ProbeHandler: corev1.ProbeHandler{ HTTPGet: &corev1.HTTPGetAction{ diff --git a/pkg/deployment/all_in_one_test.go b/pkg/deployment/all_in_one_test.go index d475db260..1e330d26f 100644 --- a/pkg/deployment/all_in_one_test.go +++ b/pkg/deployment/all_in_one_test.go @@ -49,6 +49,10 @@ func TestDefaultAllInOneImage(t *testing.T) { Name: "JAEGER_DISABLED", Value: "false", }, + { + Name: "COLLECTOR_OTLP_ENABLED", + Value: "true", + }, } assert.Equal(t, envvars, d.Spec.Template.Spec.Containers[0].Env) } diff --git a/pkg/deployment/args.go b/pkg/deployment/args.go deleted file mode 100644 index 4ec9ba1f0..000000000 --- a/pkg/deployment/args.go +++ /dev/null @@ -1,13 +0,0 @@ -package deployment - -import ( - v1 "github.com/jaegertracing/jaeger-operator/apis/v1" -) - -func allArgs(optionsList ...v1.Options) []string { - args := []string{} - for _, options := range optionsList { - args = append(args, options.ToArgs()...) - } - return args -} diff --git a/pkg/deployment/args_test.go b/pkg/deployment/args_test.go deleted file mode 100644 index 2eb5ded2e..000000000 --- a/pkg/deployment/args_test.go +++ /dev/null @@ -1,26 +0,0 @@ -package deployment - -import ( - "sort" - "testing" - - "github.com/stretchr/testify/assert" - "k8s.io/apimachinery/pkg/types" - - v1 "github.com/jaegertracing/jaeger-operator/apis/v1" -) - -func TestArgs(t *testing.T) { - // prepare - jaeger := v1.NewJaeger(types.NamespacedName{Name: "TestArgs"}) - jaeger.Spec.Storage.Options = v1.NewOptions(map[string]interface{}{"memory.max-traces": 10000}) - jaeger.Spec.AllInOne.Options = v1.NewOptions(map[string]interface{}{"collector.http-port": 14268}) - - // test - args := allArgs(jaeger.Spec.Storage.Options, jaeger.Spec.AllInOne.Options) - - // verify - sort.Strings(args) - assert.Equal(t, "--collector.http-port=14268", args[0]) - assert.Equal(t, "--memory.max-traces=10000", args[1]) -} diff --git a/pkg/deployment/collector.go b/pkg/deployment/collector.go index 446ee3129..057edac8b 100644 --- a/pkg/deployment/collector.go +++ b/pkg/deployment/collector.go @@ -84,7 +84,7 @@ func (c *Collector) Get() *appsv1.Deployment { }) } } - options := allArgs(c.jaeger.Spec.Collector.Options, + options := util.AllArgs(c.jaeger.Spec.Collector.Options, c.jaeger.Spec.Storage.Options.Filter(storageType.OptionsPrefix())) sampling.Update(c.jaeger, commonSpec, &options) @@ -111,6 +111,40 @@ func (c *Collector) Get() *appsv1.Deployment { strategy = *c.jaeger.Spec.Collector.Strategy } + envVars := []corev1.EnvVar{ + { + Name: "SPAN_STORAGE_TYPE", + Value: string(storageType), + }, + { + Name: "COLLECTOR_ZIPKIN_HOST_PORT", + Value: ":9411", + }, + } + + ports := []corev1.ContainerPort{ + { + ContainerPort: 9411, + Name: "zipkin", + }, + { + ContainerPort: 14267, + Name: "c-tchan-trft", // for collector + }, + { + ContainerPort: 14268, + Name: "c-binary-trft", + }, + { + ContainerPort: adminPort, + Name: "admin-http", + }, + { + ContainerPort: 14250, + Name: "grpc", + }, + } + return &appsv1.Deployment{ TypeMeta: metav1.TypeMeta{ APIVersion: "apps/v1", @@ -143,43 +177,13 @@ func (c *Collector) Get() *appsv1.Deployment { Spec: corev1.PodSpec{ ImagePullSecrets: c.jaeger.Spec.ImagePullSecrets, Containers: []corev1.Container{{ - Image: util.ImageName(c.jaeger.Spec.Collector.Image, "jaeger-collector-image"), - Name: "jaeger-collector", - Args: options, - Env: []corev1.EnvVar{ - { - Name: "SPAN_STORAGE_TYPE", - Value: string(storageType), - }, - { - Name: "COLLECTOR_ZIPKIN_HOST_PORT", - Value: ":9411", - }, - }, + Image: util.ImageName(c.jaeger.Spec.Collector.Image, "jaeger-collector-image"), + Name: "jaeger-collector", + Args: options, + Env: append(envVars, getOTLPEnvVars(options)...), VolumeMounts: commonSpec.VolumeMounts, EnvFrom: envFromSource, - Ports: []corev1.ContainerPort{ - { - ContainerPort: 9411, - Name: "zipkin", - }, - { - ContainerPort: 14267, - Name: "c-tchan-trft", // for collector - }, - { - ContainerPort: 14268, - Name: "c-binary-trft", - }, - { - ContainerPort: adminPort, - Name: "admin-http", - }, - { - ContainerPort: 14250, - Name: "grpc", - }, - }, + Ports: append(ports, getOTLPContainePorts(options)...), LivenessProbe: &corev1.Probe{ ProbeHandler: corev1.ProbeHandler{ HTTPGet: &corev1.HTTPGetAction{ diff --git a/pkg/deployment/collector_test.go b/pkg/deployment/collector_test.go index 2a69dc2e5..0c297d68a 100644 --- a/pkg/deployment/collector_test.go +++ b/pkg/deployment/collector_test.go @@ -86,6 +86,10 @@ func TestDefaultCollectorImage(t *testing.T) { Name: "COLLECTOR_ZIPKIN_HOST_PORT", Value: ":9411", }, + { + Name: "COLLECTOR_OTLP_ENABLED", + Value: "true", + }, } assert.Equal(t, envvars, containers[0].Env) } @@ -379,6 +383,10 @@ func TestCollectorWithDirectStorageType(t *testing.T) { Name: "COLLECTOR_ZIPKIN_HOST_PORT", Value: ":9411", }, + { + Name: "COLLECTOR_OTLP_ENABLED", + Value: "true", + }, } assert.Equal(t, envvars, dep.Spec.Template.Spec.Containers[0].Env) assert.Len(t, dep.Spec.Template.Spec.Containers[0].Args, 2) @@ -418,6 +426,10 @@ func TestCollectorWithKafkaStorageType(t *testing.T) { Name: "COLLECTOR_ZIPKIN_HOST_PORT", Value: ":9411", }, + { + Name: "COLLECTOR_OTLP_ENABLED", + Value: "true", + }, } assert.Equal(t, envvars, dep.Spec.Template.Spec.Containers[0].Env) assert.Len(t, dep.Spec.Template.Spec.Containers[0].Args, 3) @@ -453,6 +465,10 @@ func TestCollectorWithIngesterNoOptionsStorageType(t *testing.T) { Name: "COLLECTOR_ZIPKIN_HOST_PORT", Value: ":9411", }, + { + Name: "COLLECTOR_OTLP_ENABLED", + Value: "true", + }, } assert.Equal(t, envvars, dep.Spec.Template.Spec.Containers[0].Env) assert.Len(t, dep.Spec.Template.Spec.Containers[0].Args, 2) diff --git a/pkg/deployment/ingester.go b/pkg/deployment/ingester.go index 1a0c3baee..401bfa044 100644 --- a/pkg/deployment/ingester.go +++ b/pkg/deployment/ingester.go @@ -85,7 +85,7 @@ func (i *Ingester) Get() *appsv1.Deployment { }) } - options := allArgs(i.jaeger.Spec.Ingester.Options, + options := util.AllArgs(i.jaeger.Spec.Ingester.Options, i.jaeger.Spec.Storage.Options.Filter(i.jaeger.Spec.Storage.Type.OptionsPrefix())) ca.Update(i.jaeger, commonSpec) diff --git a/pkg/deployment/otlp.go b/pkg/deployment/otlp.go new file mode 100644 index 000000000..68d49d0cf --- /dev/null +++ b/pkg/deployment/otlp.go @@ -0,0 +1,37 @@ +package deployment + +import ( + corev1 "k8s.io/api/core/v1" + + "github.com/jaegertracing/jaeger-operator/pkg/util" +) + +const collectorOTLPEnvVarName = "COLLECTOR_OTLP_ENABLED" + +func getOTLPEnvVars(options []string) []corev1.EnvVar { + if !util.IsOTLPExplcitSet(options) { + return []corev1.EnvVar{ + { + Name: collectorOTLPEnvVarName, + Value: "true", + }, + } + } + return []corev1.EnvVar{} +} + +func getOTLPContainePorts(options []string) []corev1.ContainerPort { + if util.IsOTLPEnable(options) { + return []corev1.ContainerPort{ + { + ContainerPort: 4317, + Name: "otlp-grpc", + }, + { + ContainerPort: 4318, + Name: "otlp-http", + }, + } + } + return []corev1.ContainerPort{} +} diff --git a/pkg/deployment/query.go b/pkg/deployment/query.go index bedf919f6..20dd3f0a0 100644 --- a/pkg/deployment/query.go +++ b/pkg/deployment/query.go @@ -67,7 +67,7 @@ func (q *Query) Get() *appsv1.Deployment { commonSpec.Annotations["sidecar.istio.io/inject"] = "false" } - options := allArgs(q.jaeger.Spec.Query.Options, + options := util.AllArgs(q.jaeger.Spec.Query.Options, q.jaeger.Spec.Storage.Options.Filter(q.jaeger.Spec.Storage.Type.OptionsPrefix())) configmap.Update(q.jaeger, commonSpec, &options) diff --git a/pkg/service/collector.go b/pkg/service/collector.go index 615571994..4cca041f8 100644 --- a/pkg/service/collector.go +++ b/pkg/service/collector.go @@ -39,6 +39,27 @@ func clusteripCollectorService(jaeger *v1.Jaeger, selector map[string]string) *c func collectorService(jaeger *v1.Jaeger, selector map[string]string) *corev1.Service { trueVar := true + ports := []corev1.ServicePort{ + { + Name: "http-zipkin", + Port: 9411, + }, + { + Name: GetPortNameForGRPC(jaeger), + Port: 14250, + }, + { + Name: "http-c-tchan-trft", + Port: 14267, + }, + { + Name: "http-c-binary-trft", + Port: 14268, + }, + } + + ports = append(ports, getOTLPServicePorts(jaeger)...) + return &corev1.Service{ TypeMeta: metav1.TypeMeta{ Kind: "Service", @@ -59,27 +80,9 @@ func collectorService(jaeger *v1.Jaeger, selector map[string]string) *corev1.Ser Spec: corev1.ServiceSpec{ Selector: selector, ClusterIP: "", - Ports: []corev1.ServicePort{ - { - Name: "http-zipkin", - Port: 9411, - }, - { - Name: GetPortNameForGRPC(jaeger), - Port: 14250, - }, - { - Name: "http-c-tchan-trft", - Port: 14267, - }, - { - Name: "http-c-binary-trft", - Port: 14268, - }, - }, + Ports: ports, }, } - } // GetNameForCollectorService returns the service name for the collector in this Jaeger instance @@ -130,3 +133,23 @@ func getTypeForCollectorService(jaeger *v1.Jaeger) corev1.ServiceType { } return corev1.ServiceTypeClusterIP } + +func getOTLPServicePorts(jaeger *v1.Jaeger) []corev1.ServicePort { + options := util.AllArgs(jaeger.Spec.AllInOne.Options) + if jaeger.Spec.Strategy != v1.DeploymentStrategyAllInOne { + options = util.AllArgs(jaeger.Spec.Collector.Options) + } + if util.IsOTLPEnable(options) { + return []corev1.ServicePort{ + { + Name: "otlp-grpc", + Port: 4317, + }, + { + Name: "otlp-http", + Port: 4318, + }, + } + } + return []corev1.ServicePort{} +} diff --git a/pkg/util/util.go b/pkg/util/util.go index 3a11518c1..6a9c860c5 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -320,3 +320,35 @@ func GenerateProxySecret() (string, error) { return base64Secret, nil } + +// FindEnvVar return the EnvVar with given name or nil if not found +func FindEnvVar(envs []corev1.EnvVar, name string) *corev1.EnvVar { + for _, env := range envs { + if env.Name == name { + return &env + } + } + return nil +} + +// IsOTLPEnable return true if OTLP is enabled, this means --collector.otlp.enabled=true or abscense of flag, means is enabled by defaultr +func IsOTLPEnable(options []string) bool { + if IsOTLPExplcitSet(options) { + return len(FindItem("--collector.otlp.enabled=true", options)) != 0 + } + return true +} + +// IsOTLPExplcitSet return true if a flag for enable the otlp is set on the options +func IsOTLPExplcitSet(options []string) bool { + return len(FindItem("--collector.otlp.enabled=", options)) != 0 +} + +// AllArgs return slice of strings with all arguments +func AllArgs(optionsList ...v1.Options) []string { + args := []string{} + for _, options := range optionsList { + args = append(args, options.ToArgs()...) + } + return args +} diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go index 19d82bfc8..33fdfa2a9 100644 --- a/pkg/util/util_test.go +++ b/pkg/util/util_test.go @@ -1,6 +1,7 @@ package util import ( + "sort" "testing" "github.com/jaegertracing/jaeger-operator/pkg/version" @@ -628,3 +629,122 @@ func TestReplaceArgument(t *testing.T) { } } + +func TestArgs(t *testing.T) { + // prepare + jaeger := v1.NewJaeger(types.NamespacedName{Name: "TestArgs"}) + jaeger.Spec.Storage.Options = v1.NewOptions(map[string]interface{}{"memory.max-traces": 10000}) + jaeger.Spec.AllInOne.Options = v1.NewOptions(map[string]interface{}{"collector.http-port": 14268}) + + // test + args := AllArgs(jaeger.Spec.Storage.Options, jaeger.Spec.AllInOne.Options) + + // verify + sort.Strings(args) + assert.Equal(t, "--collector.http-port=14268", args[0]) + assert.Equal(t, "--memory.max-traces=10000", args[1]) +} + +func TestFindEnvVars(t *testing.T) { + + myEnvVar := corev1.EnvVar{ + Name: "my_env_var", + Value: "v1", + } + + envVars := []corev1.EnvVar{ + myEnvVar, + { + Name: "other_env", + Value: "v2", + }, + } + + tests := []struct { + name string + envName string + expected *corev1.EnvVar + }{ + { + name: "env var found", + envName: "my_env_var", + expected: &myEnvVar, + }, + { + name: "env var found", + envName: "no_exist_env", + expected: nil, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + result := FindEnvVar(envVars, tc.envName) + assert.Equal(t, result, tc.expected) + }) + } +} + +func TestIsOTLPEnable(t *testing.T) { + + tests := []struct { + name string + options v1.Options + expected bool + }{ + { + name: "explicit set to true", + options: v1.NewOptions(map[string]interface{}{"collector.otlp.enabled": true}), + expected: true, + }, + { + name: "explicit set to false", + options: v1.NewOptions(map[string]interface{}{"collector.otlp.enabled": false}), + expected: false, + }, + { + name: "no present in options", + options: v1.NewOptions(map[string]interface{}{}), + expected: true, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + enable := IsOTLPEnable(AllArgs(tc.options)) + assert.Equal(t, enable, tc.expected) + }) + } +} + +func TestIsOTLPExplcitSet(t *testing.T) { + + tests := []struct { + name string + options v1.Options + expected bool + }{ + { + name: "explicit set to true", + options: v1.NewOptions(map[string]interface{}{"collector.otlp.enabled": true}), + expected: true, + }, + { + name: "explicit set to false", + options: v1.NewOptions(map[string]interface{}{"collector.otlp.enabled": false}), + expected: true, + }, + { + name: "no present in options", + options: v1.NewOptions(map[string]interface{}{}), + expected: false, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + enable := IsOTLPExplcitSet(AllArgs(tc.options)) + assert.Equal(t, enable, tc.expected) + }) + } +}