Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support self provisioned ES in streaming strategy #842

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ e2e-tests-self-provisioned-es: prepare-e2e-tests deploy-es-operator
@echo Running Self provisioned Elasticsearch end-to-end tests...
@STORAGE_NAMESPACE=$(STORAGE_NAMESPACE) ES_OPERATOR_NAMESPACE=$(ES_OPERATOR_NAMESPACE) ES_OPERATOR_IMAGE=$(ES_OPERATOR_IMAGE) go test -tags=self_provisioned_elasticsearch ./test/e2e/... $(TEST_OPTIONS)

.PHONY: e2e-tests-self-provisioned-es-kafka
e2e-tests-self-provisioned-es-kafka: prepare-e2e-tests deploy-kafka-operator deploy-es-operator
@echo Running Self provisioned Elasticsearch and Kafka end-to-end tests...
@STORAGE_NAMESPACE=$(STORAGE_NAMESPACE) ES_OPERATOR_NAMESPACE=$(ES_OPERATOR_NAMESPACE) ES_OPERATOR_IMAGE=$(ES_OPERATOR_IMAGE) go test -tags=self_provisioned_elasticsearch_kafka ./test/e2e/... $(TEST_OPTIONS)

.PHONY: e2e-tests-streaming
e2e-tests-streaming: prepare-e2e-tests es kafka
@echo Running Streaming end-to-end tests...
Expand Down Expand Up @@ -195,8 +200,8 @@ storage:
@echo Creating namespace $(STORAGE_NAMESPACE)
@kubectl create namespace $(STORAGE_NAMESPACE) 2>&1 | grep -v "already exists" || true

.PHONY: kafka
kafka:
.PHONY: deploy-kafka-operator
deploy-kafka-operator:
@echo Creating namespace $(KAFKA_NAMESPACE)
@kubectl create namespace $(KAFKA_NAMESPACE) 2>&1 | grep -v "already exists" || true
ifeq ($(OLM),true)
Expand All @@ -209,12 +214,9 @@ else
@sed 's/namespace: .*/namespace: $(KAFKA_NAMESPACE)/' deploy/test/kafka-operator.yaml | kubectl -n $(KAFKA_NAMESPACE) apply -f - 2>&1 | grep -v "already exists" || true
@kubectl set env deployment strimzi-cluster-operator -n ${KAFKA_NAMESPACE} STRIMZI_NAMESPACE="*"
endif
@curl --location $(KAFKA_EXAMPLE) --output deploy/test/kafka-example.yaml
@kubectl -n $(KAFKA_NAMESPACE) apply -f deploy/test/kafka-example.yaml 2>&1 | grep -v "already exists" || true

.PHONY: undeploy-kafka
undeploy-kafka:
@kubectl delete --namespace $(KAFKA_NAMESPACE) -f deploy/test/kafka-example.yaml 2>&1 || true
.PHONY: undeploy-kafka-operator
undeploy-kafka-operator:
ifeq ($(OLM),true)
@echo Skiping kafka-operator undeploy
else
Expand All @@ -225,6 +227,17 @@ else
endif
@kubectl delete namespace $(KAFKA_NAMESPACE) 2>&1 || true

.PHONY: kafka
kafka: deploy-kafka-operator
@echo Creating namespace $(KAFKA_NAMESPACE)
@kubectl create namespace $(KAFKA_NAMESPACE) 2>&1 | grep -v "already exists" || true
@curl --location $(KAFKA_EXAMPLE) --output deploy/test/kafka-example.yaml
@kubectl -n $(KAFKA_NAMESPACE) apply -f deploy/test/kafka-example.yaml 2>&1 | grep -v "already exists" || true

.PHONY: undeploy-kafka
undeploy-kafka: undeploy-kafka-operator
@kubectl delete --namespace $(KAFKA_NAMESPACE) -f deploy/test/kafka-example.yaml 2>&1 || true

.PHONY: clean
clean: undeploy-kafka undeploy-es-operator
@rm -f deploy/test/*.yaml
Expand Down
50 changes: 32 additions & 18 deletions pkg/controller/jaeger/jaeger_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"strings"
"time"

"github.com/jaegertracing/jaeger-operator/pkg/storage"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was it placed here by make format?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

It either places it after core imports or after 3rd party imports in a separate block. However when I put it into operator imports it leaves it there.


"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
Expand Down Expand Up @@ -161,20 +163,7 @@ func (r *ReconcileJaeger) Reconcile(request reconcile.Request) (reconcile.Result

originalInstance := *instance

opts := client.MatchingLabels(map[string]string{
"app.kubernetes.io/instance": instance.Name,
"app.kubernetes.io/managed-by": "jaeger-operator",
})
list := &corev1.SecretList{}
if err := r.client.List(ctx, list, opts); err != nil {
instance.Status.Phase = v1.JaegerPhaseFailed
if err := r.client.Status().Update(ctx, instance); err != nil {
// we let it return the real error later
logFields.WithError(err).Error("failed to store the failed status into the current CustomResource after preconditions")
}
return reconcile.Result{}, tracing.HandleError(err, span)
}
str := r.runStrategyChooser(ctx, instance, list.Items)
str := r.runStrategyChooser(ctx, instance)

updated, err := r.apply(ctx, *instance, str)
if err != nil {
Expand Down Expand Up @@ -226,16 +215,16 @@ func validate(jaeger *v1.Jaeger) error {
return nil
}

func (r *ReconcileJaeger) runStrategyChooser(ctx context.Context, instance *v1.Jaeger, secrets []corev1.Secret) strategy.S {
func (r *ReconcileJaeger) runStrategyChooser(ctx context.Context, instance *v1.Jaeger) strategy.S {
if nil == r.strategyChooser {
return defaultStrategyChooser(ctx, instance, secrets)
return defaultStrategyChooser(ctx, instance)
}

return r.strategyChooser(ctx, instance)
}

func defaultStrategyChooser(ctx context.Context, instance *v1.Jaeger, secrets []corev1.Secret) strategy.S {
return strategy.For(ctx, instance, secrets)
func defaultStrategyChooser(ctx context.Context, instance *v1.Jaeger) strategy.S {
return strategy.For(ctx, instance)
}

func (r *ReconcileJaeger) apply(ctx context.Context, jaeger v1.Jaeger, str strategy.S) (v1.Jaeger, error) {
Expand All @@ -248,6 +237,31 @@ func (r *ReconcileJaeger) apply(ctx context.Context, jaeger v1.Jaeger, str strat
return jaeger, tracing.HandleError(err, span)
}

// ES cert handling requires secretes from environment
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/secretes/secrets

// therefore running this here and not in the strategy
if storage.ShouldDeployElasticsearch(jaeger.Spec.Storage) {
opts := client.MatchingLabels(map[string]string{
"app.kubernetes.io/instance": jaeger.Name,
"app.kubernetes.io/managed-by": "jaeger-operator",
})
secrets := &corev1.SecretList{}
if err := r.client.List(ctx, secrets, opts); err != nil {
jaeger.Status.Phase = v1.JaegerPhaseFailed
if err := r.client.Status().Update(ctx, &jaeger); err != nil {
// we let it return the real error later
jaeger.Logger().WithError(err).Error("failed to store the failed status into the current CustomResource after preconditions")
}
return jaeger, tracing.HandleError(err, span)
}
es := &storage.ElasticsearchDeployment{Jaeger: &jaeger, CertScript: "./scripts/cert_generation.sh", Secrets: secrets.Items}
err = es.CreateCerts()
if err != nil {
es.Jaeger.Logger().WithError(err).Error("failed to create Elasticsearch certificates, Elasticsearch won't be deployed")
return jaeger, err
}
str.WithSecrets(append(str.Secrets(), es.ExtractSecrets()...))
}

// secrets have to be created before ES - they are mounted to the ES pod
if err := r.applySecrets(ctx, jaeger, str.Secrets()); err != nil {
return jaeger, tracing.HandleError(err, span)
Expand Down
5 changes: 2 additions & 3 deletions pkg/strategy/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ var (
)

// For returns the appropriate Strategy for the given Jaeger instance
func For(ctx context.Context, jaeger *v1.Jaeger, secrets []corev1.Secret) S {
func For(ctx context.Context, jaeger *v1.Jaeger) S {
tracer := global.TraceProvider().GetTracer(v1.ReconciliationTracer)
ctx, span := tracer.Start(ctx, "strategy.For")
defer span.End()
Expand All @@ -50,8 +50,7 @@ func For(ctx context.Context, jaeger *v1.Jaeger, secrets []corev1.Secret) S {
return newStreamingStrategy(ctx, jaeger)
}

es := &storage.ElasticsearchDeployment{Jaeger: jaeger, CertScript: esCertGenerationScript, Secrets: secrets}
return newProductionStrategy(ctx, jaeger, es)
return newProductionStrategy(ctx, jaeger)
}

// normalize changes the incoming Jaeger object so that the defaults are applied when
Expand Down
12 changes: 6 additions & 6 deletions pkg/strategy/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ import (
func TestNewControllerForAllInOneAsDefault(t *testing.T) {
jaeger := v1.NewJaeger(types.NamespacedName{Name: "TestNewControllerForAllInOneAsDefault"})

ctrl := For(context.TODO(), jaeger, []corev1.Secret{})
ctrl := For(context.TODO(), jaeger)
assert.Equal(t, ctrl.Type(), v1.DeploymentStrategyAllInOne)
}

func TestNewControllerForAllInOneAsExplicitValue(t *testing.T) {
jaeger := v1.NewJaeger(types.NamespacedName{Name: "TestNewControllerForAllInOneAsExplicitValue"})
jaeger.Spec.Strategy = v1.DeploymentStrategyDeprecatedAllInOne // same as 'all-in-one'

ctrl := For(context.TODO(), jaeger, []corev1.Secret{})
ctrl := For(context.TODO(), jaeger)
assert.Equal(t, ctrl.Type(), v1.DeploymentStrategyAllInOne)
}

Expand All @@ -32,7 +32,7 @@ func TestNewControllerForProduction(t *testing.T) {
jaeger.Spec.Strategy = v1.DeploymentStrategyProduction
jaeger.Spec.Storage.Type = "elasticsearch"

ctrl := For(context.TODO(), jaeger, []corev1.Secret{})
ctrl := For(context.TODO(), jaeger)
assert.Equal(t, ctrl.Type(), v1.DeploymentStrategyProduction)
}

Expand All @@ -51,7 +51,7 @@ func TestElasticsearchAsStorageOptions(t *testing.T) {
"es.server-urls": "http://elasticsearch-example-es-cluster:9200",
})

ctrl := For(context.TODO(), jaeger, []corev1.Secret{})
ctrl := For(context.TODO(), jaeger)
deps := ctrl.Deployments()
assert.Len(t, deps, 2) // query and collector, for a production setup
counter := 0
Expand Down Expand Up @@ -117,7 +117,7 @@ func TestDeprecatedAllInOneStrategy(t *testing.T) {
Strategy: v1.DeploymentStrategyDeprecatedAllInOne,
},
}
For(context.TODO(), jaeger, []corev1.Secret{})
For(context.TODO(), jaeger)
assert.Equal(t, v1.DeploymentStrategyAllInOne, jaeger.Spec.Strategy)
}

Expand All @@ -130,7 +130,7 @@ func TestStorageMemoryOnlyUsedWithAllInOneStrategy(t *testing.T) {
},
},
}
For(context.TODO(), jaeger, []corev1.Secret{})
For(context.TODO(), jaeger)
assert.Equal(t, v1.DeploymentStrategyAllInOne, jaeger.Spec.Strategy)
}

Expand Down
43 changes: 24 additions & 19 deletions pkg/strategy/production.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"strings"

corev1 "k8s.io/api/core/v1"

"github.com/spf13/viper"
"go.opentelemetry.io/otel/global"
appsv1 "k8s.io/api/apps/v1"
Expand All @@ -22,7 +24,7 @@ import (
"github.com/jaegertracing/jaeger-operator/pkg/storage"
)

func newProductionStrategy(ctx context.Context, jaeger *v1.Jaeger, es *storage.ElasticsearchDeployment) S {
func newProductionStrategy(ctx context.Context, jaeger *v1.Jaeger) S {
tracer := global.TraceProvider().GetTracer(v1.ReconciliationTracer)
ctx, span := tracer.Start(ctx, "newProductionStrategy")
defer span.End()
Expand Down Expand Up @@ -104,25 +106,17 @@ func newProductionStrategy(ctx context.Context, jaeger *v1.Jaeger, es *storage.E

// assembles the pieces for an elasticsearch self-provisioned deployment via the elasticsearch operator
if storage.ShouldDeployElasticsearch(jaeger.Spec.Storage) {
err := es.CreateCerts()
if err != nil {
jaeger.Logger().WithError(err).Error("failed to create Elasticsearch certificates, Elasticsearch won't be deployed")
} else {
c.secrets = es.ExtractSecrets()
c.elasticsearches = append(c.elasticsearches, *es.Elasticsearch())

es.InjectStorageConfiguration(&queryDep.Spec.Template.Spec)
es.InjectStorageConfiguration(&cDep.Spec.Template.Spec)
if indexCleaner != nil {
es.InjectSecretsConfiguration(&indexCleaner.Spec.JobTemplate.Spec.Template.Spec)
}
for i := range esRollover {
es.InjectSecretsConfiguration(&esRollover[i].Spec.JobTemplate.Spec.Template.Spec)
}
for i := range c.dependencies {
es.InjectSecretsConfiguration(&c.dependencies[i].Spec.Template.Spec)
}
var jobs []*corev1.PodSpec
for i := range c.dependencies {
jobs = append(jobs, &c.dependencies[i].Spec.Template.Spec)
}
if indexCleaner != nil {
jobs = append(jobs, &indexCleaner.Spec.JobTemplate.Spec.Template.Spec)
}
for i := range esRollover {
jobs = append(jobs, &esRollover[i].Spec.JobTemplate.Spec.Template.Spec)
}
autoProvisionElasticsearch(&c, jaeger, jobs, []*appsv1.Deployment{queryDep, cDep})
}

// the index cleaner ES job, which may have been changed by the ES self-provisioning routine
Expand All @@ -138,3 +132,14 @@ func newProductionStrategy(ctx context.Context, jaeger *v1.Jaeger, es *storage.E

return c
}

func autoProvisionElasticsearch(manifest *S, jaeger *v1.Jaeger, curatorPods []*corev1.PodSpec, deployments []*appsv1.Deployment) {
es := &storage.ElasticsearchDeployment{Jaeger: jaeger}
for i := range deployments {
es.InjectStorageConfiguration(&deployments[i].Spec.Template.Spec)
}
for _, pod := range curatorPods {
es.InjectSecretsConfiguration(pod)
}
manifest.elasticsearches = append(manifest.elasticsearches, *es.Elasticsearch())
}
25 changes: 10 additions & 15 deletions pkg/strategy/production_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -23,7 +22,7 @@ func init() {

func TestCreateProductionDeployment(t *testing.T) {
name := "TestCreateProductionDeployment"
c := newProductionStrategy(context.Background(), v1.NewJaeger(types.NamespacedName{Name: name}), &storage.ElasticsearchDeployment{})
c := newProductionStrategy(context.Background(), v1.NewJaeger(types.NamespacedName{Name: name}))
assertDeploymentsAndServicesForProduction(t, name, c, false, false, false)
}

Expand All @@ -35,7 +34,7 @@ func TestCreateProductionDeploymentOnOpenShift(t *testing.T) {
jaeger := v1.NewJaeger(types.NamespacedName{Name: name})
normalize(context.Background(), jaeger)

c := newProductionStrategy(context.Background(), jaeger, &storage.ElasticsearchDeployment{})
c := newProductionStrategy(context.Background(), jaeger)
assertDeploymentsAndServicesForProduction(t, name, c, false, true, false)
}

Expand All @@ -45,7 +44,7 @@ func TestCreateProductionDeploymentWithDaemonSetAgent(t *testing.T) {
j := v1.NewJaeger(types.NamespacedName{Name: name})
j.Spec.Agent.Strategy = "DaemonSet"

c := newProductionStrategy(context.Background(), j, &storage.ElasticsearchDeployment{})
c := newProductionStrategy(context.Background(), j)
assertDeploymentsAndServicesForProduction(t, name, c, true, false, false)
}

Expand All @@ -59,7 +58,7 @@ func TestCreateProductionDeploymentWithUIConfigMap(t *testing.T) {
},
})

c := newProductionStrategy(context.Background(), j, &storage.ElasticsearchDeployment{})
c := newProductionStrategy(context.Background(), j)
assertDeploymentsAndServicesForProduction(t, name, c, false, false, true)
}

Expand All @@ -86,7 +85,7 @@ func TestOptionsArePassed(t *testing.T) {
},
}

ctrl := For(context.Background(), jaeger, []corev1.Secret{})
ctrl := For(context.Background(), jaeger)
deployments := ctrl.Deployments()
for _, dep := range deployments {
args := dep.Spec.Template.Spec.Containers[0].Args
Expand All @@ -110,7 +109,7 @@ func TestDelegateProductionDependencies(t *testing.T) {
// for now, we just have storage dependencies
j := v1.NewJaeger(types.NamespacedName{Name: "TestDelegateProductionDependencies"})
j.Spec.Storage.Type = "cassandra"
c := newProductionStrategy(context.Background(), j, &storage.ElasticsearchDeployment{})
c := newProductionStrategy(context.Background(), j)
assert.Equal(t, c.Dependencies(), storage.Dependencies(j))
}

Expand Down Expand Up @@ -165,19 +164,19 @@ func assertDeploymentsAndServicesForProduction(t *testing.T, name string, s S, h

func TestSparkDependenciesProduction(t *testing.T) {
testSparkDependencies(t, func(jaeger *v1.Jaeger) S {
return newProductionStrategy(context.Background(), jaeger, &storage.ElasticsearchDeployment{Jaeger: jaeger})
return newProductionStrategy(context.Background(), jaeger)
})
}

func TestEsIndexCleanerProduction(t *testing.T) {
testEsIndexCleaner(t, func(jaeger *v1.Jaeger) S {
return newProductionStrategy(context.Background(), jaeger, &storage.ElasticsearchDeployment{Jaeger: jaeger})
return newProductionStrategy(context.Background(), jaeger)
})
}

func TestAgentSidecarIsInjectedIntoQueryForStreamingForProduction(t *testing.T) {
j := v1.NewJaeger(types.NamespacedName{Name: "TestAgentSidecarIsInjectedIntoQueryForStreamingForProduction"})
c := newProductionStrategy(context.Background(), j, &storage.ElasticsearchDeployment{})
c := newProductionStrategy(context.Background(), j)
for _, dep := range c.Deployments() {
if strings.HasSuffix(dep.Name, "-query") {
assert.Equal(t, 2, len(dep.Spec.Template.Spec.Containers))
Expand All @@ -194,11 +193,7 @@ func TestElasticsearchInject(t *testing.T) {
j.Spec.Storage.EsIndexCleaner.Enabled = &verdad
j.Spec.Storage.EsIndexCleaner.NumberOfDays = &one
j.Spec.Storage.Options = v1.NewOptions(map[string]interface{}{"es.use-aliases": true})
es := &storage.ElasticsearchDeployment{Jaeger: j, CertScript: "../../scripts/cert_generation.sh"}
err := es.CleanCerts()
require.NoError(t, err)
defer es.CleanCerts()
c := newProductionStrategy(context.Background(), j, es)
c := newProductionStrategy(context.Background(), j)
// there should be index-cleaner, rollover, lookback
assert.Equal(t, 3, len(c.cronJobs))
assertEsInjectSecrets(t, c.cronJobs[0].Spec.JobTemplate.Spec.Template.Spec)
Expand Down
Loading