diff --git a/Makefile b/Makefile index 8573305b7..8dec88c8a 100644 --- a/Makefile +++ b/Makefile @@ -115,6 +115,11 @@ e2e-tests-self-provisioned-es: prepare-e2e-tests deploy-es-operator @echo Running Self provisioned Elasticsearch end-to-end tests... @STORAGE_NAMESPACE=$(STORAGE_NAMESPACE) ES_OPERATOR_NAMESPACE=$(ES_OPERATOR_NAMESPACE) ES_OPERATOR_IMAGE=$(ES_OPERATOR_IMAGE) go test -tags=self_provisioned_elasticsearch ./test/e2e/... $(TEST_OPTIONS) +.PHONY: e2e-tests-self-provisioned-es-kafka +e2e-tests-self-provisioned-es-kafka: prepare-e2e-tests deploy-kafka-operator deploy-es-operator + @echo Running Self provisioned Elasticsearch and Kafka end-to-end tests... + @STORAGE_NAMESPACE=$(STORAGE_NAMESPACE) ES_OPERATOR_NAMESPACE=$(ES_OPERATOR_NAMESPACE) ES_OPERATOR_IMAGE=$(ES_OPERATOR_IMAGE) go test -tags=self_provisioned_elasticsearch_kafka ./test/e2e/... $(TEST_OPTIONS) + .PHONY: e2e-tests-streaming e2e-tests-streaming: prepare-e2e-tests es kafka @echo Running Streaming end-to-end tests... @@ -195,8 +200,8 @@ storage: @echo Creating namespace $(STORAGE_NAMESPACE) @kubectl create namespace $(STORAGE_NAMESPACE) 2>&1 | grep -v "already exists" || true -.PHONY: kafka -kafka: +.PHONY: deploy-kafka-operator +deploy-kafka-operator: @echo Creating namespace $(KAFKA_NAMESPACE) @kubectl create namespace $(KAFKA_NAMESPACE) 2>&1 | grep -v "already exists" || true ifeq ($(OLM),true) @@ -209,12 +214,9 @@ else @sed 's/namespace: .*/namespace: $(KAFKA_NAMESPACE)/' deploy/test/kafka-operator.yaml | kubectl -n $(KAFKA_NAMESPACE) apply -f - 2>&1 | grep -v "already exists" || true @kubectl set env deployment strimzi-cluster-operator -n ${KAFKA_NAMESPACE} STRIMZI_NAMESPACE="*" endif - @curl --location $(KAFKA_EXAMPLE) --output deploy/test/kafka-example.yaml - @kubectl -n $(KAFKA_NAMESPACE) apply -f deploy/test/kafka-example.yaml 2>&1 | grep -v "already exists" || true -.PHONY: undeploy-kafka -undeploy-kafka: - @kubectl delete --namespace $(KAFKA_NAMESPACE) -f deploy/test/kafka-example.yaml 2>&1 || true +.PHONY: undeploy-kafka-operator +undeploy-kafka-operator: ifeq ($(OLM),true) @echo Skiping kafka-operator undeploy else @@ -225,6 +227,17 @@ else endif @kubectl delete namespace $(KAFKA_NAMESPACE) 2>&1 || true +.PHONY: kafka +kafka: deploy-kafka-operator + @echo Creating namespace $(KAFKA_NAMESPACE) + @kubectl create namespace $(KAFKA_NAMESPACE) 2>&1 | grep -v "already exists" || true + @curl --location $(KAFKA_EXAMPLE) --output deploy/test/kafka-example.yaml + @kubectl -n $(KAFKA_NAMESPACE) apply -f deploy/test/kafka-example.yaml 2>&1 | grep -v "already exists" || true + +.PHONY: undeploy-kafka +undeploy-kafka: undeploy-kafka-operator + @kubectl delete --namespace $(KAFKA_NAMESPACE) -f deploy/test/kafka-example.yaml 2>&1 || true + .PHONY: clean clean: undeploy-kafka undeploy-es-operator @rm -f deploy/test/*.yaml diff --git a/pkg/controller/jaeger/jaeger_controller.go b/pkg/controller/jaeger/jaeger_controller.go index 2a5dafd9b..dc88b8d93 100644 --- a/pkg/controller/jaeger/jaeger_controller.go +++ b/pkg/controller/jaeger/jaeger_controller.go @@ -25,6 +25,7 @@ import ( v1 "github.com/jaegertracing/jaeger-operator/pkg/apis/jaegertracing/v1" "github.com/jaegertracing/jaeger-operator/pkg/autodetect" + "github.com/jaegertracing/jaeger-operator/pkg/storage" "github.com/jaegertracing/jaeger-operator/pkg/strategy" "github.com/jaegertracing/jaeger-operator/pkg/tracing" ) @@ -161,20 +162,7 @@ func (r *ReconcileJaeger) Reconcile(request reconcile.Request) (reconcile.Result originalInstance := *instance - opts := client.MatchingLabels(map[string]string{ - "app.kubernetes.io/instance": instance.Name, - "app.kubernetes.io/managed-by": "jaeger-operator", - }) - list := &corev1.SecretList{} - if err := r.client.List(ctx, list, opts); err != nil { - instance.Status.Phase = v1.JaegerPhaseFailed - if err := r.client.Status().Update(ctx, instance); err != nil { - // we let it return the real error later - logFields.WithError(err).Error("failed to store the failed status into the current CustomResource after preconditions") - } - return reconcile.Result{}, tracing.HandleError(err, span) - } - str := r.runStrategyChooser(ctx, instance, list.Items) + str := r.runStrategyChooser(ctx, instance) updated, err := r.apply(ctx, *instance, str) if err != nil { @@ -226,16 +214,16 @@ func validate(jaeger *v1.Jaeger) error { return nil } -func (r *ReconcileJaeger) runStrategyChooser(ctx context.Context, instance *v1.Jaeger, secrets []corev1.Secret) strategy.S { +func (r *ReconcileJaeger) runStrategyChooser(ctx context.Context, instance *v1.Jaeger) strategy.S { if nil == r.strategyChooser { - return defaultStrategyChooser(ctx, instance, secrets) + return defaultStrategyChooser(ctx, instance) } return r.strategyChooser(ctx, instance) } -func defaultStrategyChooser(ctx context.Context, instance *v1.Jaeger, secrets []corev1.Secret) strategy.S { - return strategy.For(ctx, instance, secrets) +func defaultStrategyChooser(ctx context.Context, instance *v1.Jaeger) strategy.S { + return strategy.For(ctx, instance) } func (r *ReconcileJaeger) apply(ctx context.Context, jaeger v1.Jaeger, str strategy.S) (v1.Jaeger, error) { @@ -248,6 +236,31 @@ func (r *ReconcileJaeger) apply(ctx context.Context, jaeger v1.Jaeger, str strat return jaeger, tracing.HandleError(err, span) } + // ES cert handling requires secrets from environment + // therefore running this here and not in the strategy + if storage.ShouldDeployElasticsearch(jaeger.Spec.Storage) { + opts := client.MatchingLabels(map[string]string{ + "app.kubernetes.io/instance": jaeger.Name, + "app.kubernetes.io/managed-by": "jaeger-operator", + }) + secrets := &corev1.SecretList{} + if err := r.client.List(ctx, secrets, opts); err != nil { + jaeger.Status.Phase = v1.JaegerPhaseFailed + if err := r.client.Status().Update(ctx, &jaeger); err != nil { + // we let it return the real error later + jaeger.Logger().WithError(err).Error("failed to store the failed status into the current CustomResource after preconditions") + } + return jaeger, tracing.HandleError(err, span) + } + es := &storage.ElasticsearchDeployment{Jaeger: &jaeger, CertScript: "./scripts/cert_generation.sh", Secrets: secrets.Items} + err = es.CreateCerts() + if err != nil { + es.Jaeger.Logger().WithError(err).Error("failed to create Elasticsearch certificates, Elasticsearch won't be deployed") + return jaeger, err + } + str = str.WithSecrets(append(str.Secrets(), es.ExtractSecrets()...)) + } + // secrets have to be created before ES - they are mounted to the ES pod if err := r.applySecrets(ctx, jaeger, str.Secrets()); err != nil { return jaeger, tracing.HandleError(err, span) diff --git a/pkg/strategy/controller.go b/pkg/strategy/controller.go index 7ea4c32dd..5a448995a 100644 --- a/pkg/strategy/controller.go +++ b/pkg/strategy/controller.go @@ -29,7 +29,7 @@ var ( ) // For returns the appropriate Strategy for the given Jaeger instance -func For(ctx context.Context, jaeger *v1.Jaeger, secrets []corev1.Secret) S { +func For(ctx context.Context, jaeger *v1.Jaeger) S { tracer := global.TraceProvider().GetTracer(v1.ReconciliationTracer) ctx, span := tracer.Start(ctx, "strategy.For") defer span.End() @@ -50,8 +50,7 @@ func For(ctx context.Context, jaeger *v1.Jaeger, secrets []corev1.Secret) S { return newStreamingStrategy(ctx, jaeger) } - es := &storage.ElasticsearchDeployment{Jaeger: jaeger, CertScript: esCertGenerationScript, Secrets: secrets} - return newProductionStrategy(ctx, jaeger, es) + return newProductionStrategy(ctx, jaeger) } // normalize changes the incoming Jaeger object so that the defaults are applied when diff --git a/pkg/strategy/controller_test.go b/pkg/strategy/controller_test.go index c8af46a3d..46f0af1c6 100644 --- a/pkg/strategy/controller_test.go +++ b/pkg/strategy/controller_test.go @@ -15,7 +15,7 @@ import ( func TestNewControllerForAllInOneAsDefault(t *testing.T) { jaeger := v1.NewJaeger(types.NamespacedName{Name: "TestNewControllerForAllInOneAsDefault"}) - ctrl := For(context.TODO(), jaeger, []corev1.Secret{}) + ctrl := For(context.TODO(), jaeger) assert.Equal(t, ctrl.Type(), v1.DeploymentStrategyAllInOne) } @@ -23,7 +23,7 @@ func TestNewControllerForAllInOneAsExplicitValue(t *testing.T) { jaeger := v1.NewJaeger(types.NamespacedName{Name: "TestNewControllerForAllInOneAsExplicitValue"}) jaeger.Spec.Strategy = v1.DeploymentStrategyDeprecatedAllInOne // same as 'all-in-one' - ctrl := For(context.TODO(), jaeger, []corev1.Secret{}) + ctrl := For(context.TODO(), jaeger) assert.Equal(t, ctrl.Type(), v1.DeploymentStrategyAllInOne) } @@ -32,7 +32,7 @@ func TestNewControllerForProduction(t *testing.T) { jaeger.Spec.Strategy = v1.DeploymentStrategyProduction jaeger.Spec.Storage.Type = "elasticsearch" - ctrl := For(context.TODO(), jaeger, []corev1.Secret{}) + ctrl := For(context.TODO(), jaeger) assert.Equal(t, ctrl.Type(), v1.DeploymentStrategyProduction) } @@ -51,7 +51,7 @@ func TestElasticsearchAsStorageOptions(t *testing.T) { "es.server-urls": "http://elasticsearch-example-es-cluster:9200", }) - ctrl := For(context.TODO(), jaeger, []corev1.Secret{}) + ctrl := For(context.TODO(), jaeger) deps := ctrl.Deployments() assert.Len(t, deps, 2) // query and collector, for a production setup counter := 0 @@ -117,7 +117,7 @@ func TestDeprecatedAllInOneStrategy(t *testing.T) { Strategy: v1.DeploymentStrategyDeprecatedAllInOne, }, } - For(context.TODO(), jaeger, []corev1.Secret{}) + For(context.TODO(), jaeger) assert.Equal(t, v1.DeploymentStrategyAllInOne, jaeger.Spec.Strategy) } @@ -130,7 +130,7 @@ func TestStorageMemoryOnlyUsedWithAllInOneStrategy(t *testing.T) { }, }, } - For(context.TODO(), jaeger, []corev1.Secret{}) + For(context.TODO(), jaeger) assert.Equal(t, v1.DeploymentStrategyAllInOne, jaeger.Spec.Strategy) } diff --git a/pkg/strategy/production.go b/pkg/strategy/production.go index abb34a3c6..6af0938d5 100644 --- a/pkg/strategy/production.go +++ b/pkg/strategy/production.go @@ -4,6 +4,8 @@ import ( "context" "strings" + corev1 "k8s.io/api/core/v1" + "github.com/spf13/viper" "go.opentelemetry.io/otel/global" appsv1 "k8s.io/api/apps/v1" @@ -22,7 +24,7 @@ import ( "github.com/jaegertracing/jaeger-operator/pkg/storage" ) -func newProductionStrategy(ctx context.Context, jaeger *v1.Jaeger, es *storage.ElasticsearchDeployment) S { +func newProductionStrategy(ctx context.Context, jaeger *v1.Jaeger) S { tracer := global.TraceProvider().GetTracer(v1.ReconciliationTracer) ctx, span := tracer.Start(ctx, "newProductionStrategy") defer span.End() @@ -104,25 +106,17 @@ func newProductionStrategy(ctx context.Context, jaeger *v1.Jaeger, es *storage.E // assembles the pieces for an elasticsearch self-provisioned deployment via the elasticsearch operator if storage.ShouldDeployElasticsearch(jaeger.Spec.Storage) { - err := es.CreateCerts() - if err != nil { - jaeger.Logger().WithError(err).Error("failed to create Elasticsearch certificates, Elasticsearch won't be deployed") - } else { - c.secrets = es.ExtractSecrets() - c.elasticsearches = append(c.elasticsearches, *es.Elasticsearch()) - - es.InjectStorageConfiguration(&queryDep.Spec.Template.Spec) - es.InjectStorageConfiguration(&cDep.Spec.Template.Spec) - if indexCleaner != nil { - es.InjectSecretsConfiguration(&indexCleaner.Spec.JobTemplate.Spec.Template.Spec) - } - for i := range esRollover { - es.InjectSecretsConfiguration(&esRollover[i].Spec.JobTemplate.Spec.Template.Spec) - } - for i := range c.dependencies { - es.InjectSecretsConfiguration(&c.dependencies[i].Spec.Template.Spec) - } + var jobs []*corev1.PodSpec + for i := range c.dependencies { + jobs = append(jobs, &c.dependencies[i].Spec.Template.Spec) + } + if indexCleaner != nil { + jobs = append(jobs, &indexCleaner.Spec.JobTemplate.Spec.Template.Spec) + } + for i := range esRollover { + jobs = append(jobs, &esRollover[i].Spec.JobTemplate.Spec.Template.Spec) } + autoProvisionElasticsearch(&c, jaeger, jobs, []*appsv1.Deployment{queryDep, cDep}) } // the index cleaner ES job, which may have been changed by the ES self-provisioning routine @@ -138,3 +132,14 @@ func newProductionStrategy(ctx context.Context, jaeger *v1.Jaeger, es *storage.E return c } + +func autoProvisionElasticsearch(manifest *S, jaeger *v1.Jaeger, curatorPods []*corev1.PodSpec, deployments []*appsv1.Deployment) { + es := &storage.ElasticsearchDeployment{Jaeger: jaeger} + for i := range deployments { + es.InjectStorageConfiguration(&deployments[i].Spec.Template.Spec) + } + for _, pod := range curatorPods { + es.InjectSecretsConfiguration(pod) + } + manifest.elasticsearches = append(manifest.elasticsearches, *es.Elasticsearch()) +} diff --git a/pkg/strategy/production_test.go b/pkg/strategy/production_test.go index cf6d45570..0e7bf291c 100644 --- a/pkg/strategy/production_test.go +++ b/pkg/strategy/production_test.go @@ -8,7 +8,6 @@ import ( "github.com/spf13/viper" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -23,7 +22,7 @@ func init() { func TestCreateProductionDeployment(t *testing.T) { name := "TestCreateProductionDeployment" - c := newProductionStrategy(context.Background(), v1.NewJaeger(types.NamespacedName{Name: name}), &storage.ElasticsearchDeployment{}) + c := newProductionStrategy(context.Background(), v1.NewJaeger(types.NamespacedName{Name: name})) assertDeploymentsAndServicesForProduction(t, name, c, false, false, false) } @@ -35,7 +34,7 @@ func TestCreateProductionDeploymentOnOpenShift(t *testing.T) { jaeger := v1.NewJaeger(types.NamespacedName{Name: name}) normalize(context.Background(), jaeger) - c := newProductionStrategy(context.Background(), jaeger, &storage.ElasticsearchDeployment{}) + c := newProductionStrategy(context.Background(), jaeger) assertDeploymentsAndServicesForProduction(t, name, c, false, true, false) } @@ -45,7 +44,7 @@ func TestCreateProductionDeploymentWithDaemonSetAgent(t *testing.T) { j := v1.NewJaeger(types.NamespacedName{Name: name}) j.Spec.Agent.Strategy = "DaemonSet" - c := newProductionStrategy(context.Background(), j, &storage.ElasticsearchDeployment{}) + c := newProductionStrategy(context.Background(), j) assertDeploymentsAndServicesForProduction(t, name, c, true, false, false) } @@ -59,7 +58,7 @@ func TestCreateProductionDeploymentWithUIConfigMap(t *testing.T) { }, }) - c := newProductionStrategy(context.Background(), j, &storage.ElasticsearchDeployment{}) + c := newProductionStrategy(context.Background(), j) assertDeploymentsAndServicesForProduction(t, name, c, false, false, true) } @@ -86,7 +85,7 @@ func TestOptionsArePassed(t *testing.T) { }, } - ctrl := For(context.Background(), jaeger, []corev1.Secret{}) + ctrl := For(context.Background(), jaeger) deployments := ctrl.Deployments() for _, dep := range deployments { args := dep.Spec.Template.Spec.Containers[0].Args @@ -110,7 +109,7 @@ func TestDelegateProductionDependencies(t *testing.T) { // for now, we just have storage dependencies j := v1.NewJaeger(types.NamespacedName{Name: "TestDelegateProductionDependencies"}) j.Spec.Storage.Type = "cassandra" - c := newProductionStrategy(context.Background(), j, &storage.ElasticsearchDeployment{}) + c := newProductionStrategy(context.Background(), j) assert.Equal(t, c.Dependencies(), storage.Dependencies(j)) } @@ -165,19 +164,19 @@ func assertDeploymentsAndServicesForProduction(t *testing.T, name string, s S, h func TestSparkDependenciesProduction(t *testing.T) { testSparkDependencies(t, func(jaeger *v1.Jaeger) S { - return newProductionStrategy(context.Background(), jaeger, &storage.ElasticsearchDeployment{Jaeger: jaeger}) + return newProductionStrategy(context.Background(), jaeger) }) } func TestEsIndexCleanerProduction(t *testing.T) { testEsIndexCleaner(t, func(jaeger *v1.Jaeger) S { - return newProductionStrategy(context.Background(), jaeger, &storage.ElasticsearchDeployment{Jaeger: jaeger}) + return newProductionStrategy(context.Background(), jaeger) }) } func TestAgentSidecarIsInjectedIntoQueryForStreamingForProduction(t *testing.T) { j := v1.NewJaeger(types.NamespacedName{Name: "TestAgentSidecarIsInjectedIntoQueryForStreamingForProduction"}) - c := newProductionStrategy(context.Background(), j, &storage.ElasticsearchDeployment{}) + c := newProductionStrategy(context.Background(), j) for _, dep := range c.Deployments() { if strings.HasSuffix(dep.Name, "-query") { assert.Equal(t, 2, len(dep.Spec.Template.Spec.Containers)) @@ -194,11 +193,7 @@ func TestElasticsearchInject(t *testing.T) { j.Spec.Storage.EsIndexCleaner.Enabled = &verdad j.Spec.Storage.EsIndexCleaner.NumberOfDays = &one j.Spec.Storage.Options = v1.NewOptions(map[string]interface{}{"es.use-aliases": true}) - es := &storage.ElasticsearchDeployment{Jaeger: j, CertScript: "../../scripts/cert_generation.sh"} - err := es.CleanCerts() - require.NoError(t, err) - defer es.CleanCerts() - c := newProductionStrategy(context.Background(), j, es) + c := newProductionStrategy(context.Background(), j) // there should be index-cleaner, rollover, lookback assert.Equal(t, 3, len(c.cronJobs)) assertEsInjectSecrets(t, c.cronJobs[0].Spec.JobTemplate.Spec.Template.Spec) diff --git a/pkg/strategy/streaming.go b/pkg/strategy/streaming.go index 3f6e1e72f..006517dc9 100644 --- a/pkg/strategy/streaming.go +++ b/pkg/strategy/streaming.go @@ -5,6 +5,8 @@ import ( "fmt" "strings" + batchv1beta1 "k8s.io/api/batch/v1beta1" + "github.com/spf13/viper" "go.opentelemetry.io/otel/global" appsv1 "k8s.io/api/apps/v1" @@ -65,13 +67,6 @@ func newStreamingStrategy(ctx context.Context, jaeger *v1.Jaeger) S { manifest = autoProvisionKafka(ctx, jaeger, manifest) } - // add the deployments - manifest.deployments = []appsv1.Deployment{*collector.Get(), *inject.Sidecar(jaeger, inject.OAuthProxy(jaeger, query.Get()))} - - if d := ingester.Get(); d != nil { - manifest.deployments = append(manifest.deployments, *d) - } - // add the daemonsets if ds := agent.Get(); ds != nil { manifest.daemonSets = []appsv1.DaemonSet{*ds} @@ -105,16 +100,60 @@ func newStreamingStrategy(ctx context.Context, jaeger *v1.Jaeger) S { } } + var indexCleaner *batchv1beta1.CronJob if isBoolTrue(jaeger.Spec.Storage.EsIndexCleaner.Enabled) { if strings.EqualFold(jaeger.Spec.Storage.Type, "elasticsearch") { - manifest.cronJobs = append(manifest.cronJobs, *cronjob.CreateEsIndexCleaner(jaeger)) + indexCleaner = cronjob.CreateEsIndexCleaner(jaeger) } else { jaeger.Logger().WithField("type", jaeger.Spec.Storage.Type).Warn("Skipping Elasticsearch index cleaner job due to unsupported storage.") } } + var esRollover []batchv1beta1.CronJob + if storage.EnableRollover(jaeger.Spec.Storage) { + esRollover = cronjob.CreateRollover(jaeger) + } + + // prepare the deployments, which may get changed by the elasticsearch routine + cDep := collector.Get() + queryDep := inject.Sidecar(jaeger, inject.OAuthProxy(jaeger, query.Get())) + var ingesterDep *appsv1.Deployment + if d := ingester.Get(); d != nil { + ingesterDep = d + } manifest.dependencies = storage.Dependencies(jaeger) + // assembles the pieces for an elasticsearch self-provisioned deployment via the elasticsearch operator + if storage.ShouldDeployElasticsearch(jaeger.Spec.Storage) { + var jobs []*corev1.PodSpec + for i := range manifest.dependencies { + jobs = append(jobs, &manifest.dependencies[i].Spec.Template.Spec) + } + if indexCleaner != nil { + jobs = append(jobs, &indexCleaner.Spec.JobTemplate.Spec.Template.Spec) + } + for i := range esRollover { + jobs = append(jobs, &esRollover[i].Spec.JobTemplate.Spec.Template.Spec) + } + deps := []*appsv1.Deployment{queryDep} + if ingesterDep != nil { + deps = append(deps, ingesterDep) + } + autoProvisionElasticsearch(&manifest, jaeger, jobs, deps) + } + manifest.deployments = []appsv1.Deployment{*cDep, *queryDep} + if ingesterDep != nil { + manifest.deployments = append(manifest.deployments, *ingesterDep) + } + + // the index cleaner ES job, which may have been changed by the ES self-provisioning routine + if indexCleaner != nil { + manifest.cronJobs = append(manifest.cronJobs, *indexCleaner) + } + if len(esRollover) > 0 { + manifest.cronJobs = append(manifest.cronJobs, esRollover...) + } + return manifest } diff --git a/pkg/strategy/streaming_test.go b/pkg/strategy/streaming_test.go index 5d06d8f24..21f4cb15c 100644 --- a/pkg/strategy/streaming_test.go +++ b/pkg/strategy/streaming_test.go @@ -131,7 +131,7 @@ func TestStreamingOptionsArePassed(t *testing.T) { }, } - ctrl := For(context.TODO(), jaeger, []corev1.Secret{}) + ctrl := For(context.TODO(), jaeger) deployments := ctrl.Deployments() for _, dep := range deployments { args := dep.Spec.Template.Spec.Containers[0].Args @@ -348,3 +348,35 @@ func TestReplaceVolumeMount(t *testing.T) { } assert.Equal(t, 2, found) } + +func TestAutoProvisionedKafkaAndElasticsearch(t *testing.T) { + verdad := true + one := int(1) + jaeger := v1.NewJaeger(types.NamespacedName{Name: t.Name()}) + jaeger.Spec.Storage.Type = "elasticsearch" + jaeger.Spec.Storage.EsIndexCleaner.Enabled = &verdad + jaeger.Spec.Storage.EsIndexCleaner.NumberOfDays = &one + jaeger.Spec.Storage.Options = v1.NewOptions(map[string]interface{}{"es.use-aliases": true}) + + c := newStreamingStrategy(context.Background(), jaeger) + // there should be index-cleaner, rollover, lookback + assert.Equal(t, 3, len(c.cronJobs)) + assertEsInjectSecretsStreaming(t, c.cronJobs[0].Spec.JobTemplate.Spec.Template.Spec) + assertEsInjectSecretsStreaming(t, c.cronJobs[1].Spec.JobTemplate.Spec.Template.Spec) + assertEsInjectSecretsStreaming(t, c.cronJobs[2].Spec.JobTemplate.Spec.Template.Spec) +} + +func assertEsInjectSecretsStreaming(t *testing.T, p corev1.PodSpec) { + // first two volumes are from the common spec + assert.Equal(t, 3, len(p.Volumes)) + assert.Equal(t, "certs", p.Volumes[2].Name) + assert.Equal(t, "certs", p.Containers[0].VolumeMounts[2].Name) + envs := map[string]corev1.EnvVar{} + for _, e := range p.Containers[0].Env { + envs[e.Name] = e + } + assert.Contains(t, envs, "ES_TLS") + assert.Contains(t, envs, "ES_TLS_CA") + assert.Contains(t, envs, "ES_TLS_KEY") + assert.Contains(t, envs, "ES_TLS_CERT") +} diff --git a/test/e2e/self_provisioned_elasticsearch_kafka_test.go b/test/e2e/self_provisioned_elasticsearch_kafka_test.go new file mode 100644 index 000000000..6717b62f2 --- /dev/null +++ b/test/e2e/self_provisioned_elasticsearch_kafka_test.go @@ -0,0 +1,127 @@ +// +build self_provisioned_elasticsearch_kafka + +package e2e + +import ( + goctx "context" + "testing" + + kafkav1beta1 "github.com/jaegertracing/jaeger-operator/pkg/apis/kafka/v1beta1" + + framework "github.com/operator-framework/operator-sdk/pkg/test" + "github.com/operator-framework/operator-sdk/pkg/test/e2eutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/jaegertracing/jaeger-operator/pkg/apis" + v1 "github.com/jaegertracing/jaeger-operator/pkg/apis/jaegertracing/v1" + esv1 "github.com/jaegertracing/jaeger-operator/pkg/storage/elasticsearch/v1" +) + +type SelfProvisionedTestSuite struct { + suite.Suite +} + +func (suite *SelfProvisionedTestSuite) SetupSuite() { + t = suite.T() + if !isOpenShift(t) { + t.Skipf("Test %s is currently supported only on OpenShift because es-operator runs only on OpenShift\n", t.Name()) + } + + assert.NoError(t, framework.AddToFrameworkScheme(apis.AddToScheme, &v1.JaegerList{ + TypeMeta: metav1.TypeMeta{ + Kind: "Jaeger", + APIVersion: "jaegertracing.io/v1", + }, + })) + assert.NoError(t, framework.AddToFrameworkScheme(apis.AddToScheme, &esv1.ElasticsearchList{ + TypeMeta: metav1.TypeMeta{ + Kind: "Elasticsearch", + APIVersion: "logging.openshift.io/v1", + }, + })) + assert.NoError(t, framework.AddToFrameworkScheme(apis.AddToScheme, &kafkav1beta1.KafkaList{ + TypeMeta: metav1.TypeMeta{ + Kind: "Kafka", + APIVersion: "kafka.strimzi.io/v1beta1", + }, + })) + + var err error + ctx, err = prepare(t) + if err != nil { + if ctx != nil { + ctx.Cleanup() + } + require.FailNow(t, "Failed in prepare") + } + fw = framework.Global + namespace, _ = ctx.GetNamespace() + require.NotNil(t, namespace, "GetNamespace failed") +} + +func (suite *SelfProvisionedTestSuite) TearDownSuite() { + handleSuiteTearDown() +} + +func TestSelfProvisionedSuite(t *testing.T) { + suite.Run(t, new(SelfProvisionedTestSuite)) +} + +func (suite *SelfProvisionedTestSuite) SetupTest() { + t = suite.T() +} + +func (suite *SelfProvisionedTestSuite) AfterTest(suiteName, testName string) { + handleTestFailure() +} + +func (suite *SelfProvisionedTestSuite) TestSelfProvisionedESAndKafkaSmokeTest() { + // create jaeger custom resource + jaegerInstanceName := "simple-prod" + exampleJaeger := getJaegerSelfProvisionedESAndKafka(jaegerInstanceName) + err := fw.Client.Create(goctx.TODO(), exampleJaeger, &framework.CleanupOptions{TestContext: ctx, Timeout: timeout, RetryInterval: retryInterval}) + require.NoError(t, err, "Error deploying example Jaeger") + defer undeployJaegerInstance(exampleJaeger) + + err = e2eutil.WaitForDeployment(t, fw.KubeClient, namespace, jaegerInstanceName+"-collector", 1, retryInterval, timeout*3) + require.NoError(t, err, "Error waiting for collector deployment") + + err = e2eutil.WaitForDeployment(t, fw.KubeClient, namespace, jaegerInstanceName+"-query", 1, retryInterval, timeout) + require.NoError(t, err, "Error waiting for query deployment") + + err = WaitForDeployment(t, fw.KubeClient, namespace, jaegerInstanceName+"-ingester", 1, retryInterval, timeout) + require.NoError(t, err, "Error waiting for ingester deployment") + + ProductionSmokeTest(jaegerInstanceName) +} + +func getJaegerSelfProvisionedESAndKafka(instanceName string) *v1.Jaeger { + return &v1.Jaeger{ + TypeMeta: metav1.TypeMeta{ + Kind: "Jaeger", + APIVersion: "jaegertracing.io/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: instanceName, + Namespace: namespace, + }, + Spec: v1.JaegerSpec{ + Strategy: v1.DeploymentStrategyStreaming, + Storage: v1.JaegerStorageSpec{ + Type: "elasticsearch", + Elasticsearch: v1.ElasticsearchSpec{ + NodeCount: 1, + Resources: &corev1.ResourceRequirements{ + Limits: corev1.ResourceList{corev1.ResourceMemory: resource.MustParse("1Gi")}, + Requests: corev1.ResourceList{corev1.ResourceMemory: resource.MustParse("1Gi")}, + }, + }, + }, + }, + } +}