diff --git a/deploy/examples/simple-prod.yaml b/deploy/examples/simple-prod.yaml index 2233d186a..1745f732d 100644 --- a/deploy/examples/simple-prod.yaml +++ b/deploy/examples/simple-prod.yaml @@ -12,3 +12,5 @@ spec: server-urls: http://elasticsearch:9200 username: elastic password: changeme + dependencies: + enabled: true diff --git a/deploy/role.yaml b/deploy/role.yaml index b3f9211dd..8f13f135d 100644 --- a/deploy/role.yaml +++ b/deploy/role.yaml @@ -48,5 +48,6 @@ rules: - batch resources: - jobs + - cronjobs verbs: - "*" diff --git a/pkg/apis/io/v1alpha1/jaeger_types.go b/pkg/apis/io/v1alpha1/jaeger_types.go index b45dea788..29b925119 100644 --- a/pkg/apis/io/v1alpha1/jaeger_types.go +++ b/pkg/apis/io/v1alpha1/jaeger_types.go @@ -125,6 +125,7 @@ type JaegerStorageSpec struct { SecretName string `json:"secretName"` Options Options `json:"options"` CassandraCreateSchema JaegerCassandraCreateSchemaSpec `json:"cassandraCreateSchema"` + SparkDependencies JaegerDependenciesSpec `json:"dependencies"` } // JaegerCassandraCreateSchemaSpec holds the options related to the create-schema batch job @@ -135,6 +136,20 @@ type JaegerCassandraCreateSchemaSpec struct { Mode string `json:"mode"` } +// JaegerDependenciesSpec defined options for running spark-dependencies. +type JaegerDependenciesSpec struct { + Enabled bool `json:"enabled"` + SparkMaster string `json:"sparkMaster"` + Schedule string `json:"schedule"` + Image string `json:"image"` + JavaOpts string `json:"javaOpts"` + CassandraUseSsl bool `json:"cassandraUseSsl"` + CassandraLocalDc string `json:"cassandraLocalDc"` + CassandraClientAuthEnabled bool `json:"cassandraClientAuthEnabled"` + ElasticsearchClientNodeOnly bool `json:"elasticsearchClientNodeOnly"` + ElasticsearchNodesWanOnly bool `json:"elasticsearchNodesWanOnly"` +} + func init() { SchemeBuilder.Register(&Jaeger{}, &JaegerList{}) } diff --git a/pkg/apis/io/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/io/v1alpha1/zz_generated.deepcopy.go index b45b9bbb6..74413a814 100644 --- a/pkg/apis/io/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/io/v1alpha1/zz_generated.deepcopy.go @@ -187,6 +187,22 @@ func (in *JaegerCommonSpec) DeepCopy() *JaegerCommonSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *JaegerDependenciesSpec) DeepCopyInto(out *JaegerDependenciesSpec) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new JaegerDependenciesSpec. +func (in *JaegerDependenciesSpec) DeepCopy() *JaegerDependenciesSpec { + if in == nil { + return nil + } + out := new(JaegerDependenciesSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *JaegerIngressSpec) DeepCopyInto(out *JaegerIngressSpec) { *out = *in @@ -323,6 +339,7 @@ func (in *JaegerStorageSpec) DeepCopyInto(out *JaegerStorageSpec) { *out = *in in.Options.DeepCopyInto(&out.Options) in.CassandraCreateSchema.DeepCopyInto(&out.CassandraCreateSchema) + out.SparkDependencies = in.SparkDependencies return } diff --git a/pkg/cmd/start/main.go b/pkg/cmd/start/main.go index 258c5b499..ad60ab3a5 100644 --- a/pkg/cmd/start/main.go +++ b/pkg/cmd/start/main.go @@ -45,6 +45,9 @@ func NewStartCommand() *cobra.Command { cmd.Flags().String("jaeger-cassandra-schema-image", "jaegertracing/jaeger-cassandra-schema", "The Docker image for the Jaeger Cassandra Schema") viper.BindPFlag("jaeger-cassandra-schema-image", cmd.Flags().Lookup("jaeger-cassandra-schema-image")) + cmd.Flags().String("jaeger-spark-dependencies-image", "jaegertracing/spark-dependencies", "The Docker image for the Spark Dependencies Job") + viper.BindPFlag("jaeger-spark-dependencies-image", cmd.Flags().Lookup("jaeger-spark-dependencies-image")) + cmd.Flags().String("openshift-oauth-proxy-image", "openshift/oauth-proxy:latest", "The Docker image location definition for the OpenShift OAuth Proxy") viper.BindPFlag("openshift-oauth-proxy-image", cmd.Flags().Lookup("openshift-oauth-proxy-image")) diff --git a/pkg/cronjob/spark_dependencies.go b/pkg/cronjob/spark_dependencies.go new file mode 100644 index 000000000..c484c7559 --- /dev/null +++ b/pkg/cronjob/spark_dependencies.go @@ -0,0 +1,127 @@ +package cronjob + +import ( + "fmt" + "strconv" + + "github.com/spf13/viper" + batchv1 "k8s.io/api/batch/v1" + batchv1beta1 "k8s.io/api/batch/v1beta1" + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/jaegertracing/jaeger-operator/pkg/apis/io/v1alpha1" + "github.com/jaegertracing/jaeger-operator/pkg/storage" +) + +var supportedStorageTypes = map[string]bool{"elasticsearch": true, "cassandra": true} + +func SupportedStorage(storage string) bool { + return supportedStorageTypes[storage] +} + +func Create(jaeger *v1alpha1.Jaeger) *batchv1beta1.CronJob { + applyDefaults(jaeger) + + envVars := []v1.EnvVar{ + {Name: "STORAGE", Value: jaeger.Spec.Storage.Type}, + {Name: "SPARK_MASTER", Value: jaeger.Spec.Storage.SparkDependencies.SparkMaster}, + {Name: "JAVA_OPTS", Value: jaeger.Spec.Storage.SparkDependencies.JavaOpts}, + } + envVars = append(envVars, getStorageEnvs(jaeger.Spec.Storage)...) + + trueVar := true + name := fmt.Sprintf("%s-spark-dependencies", jaeger.Name) + return &batchv1beta1.CronJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: jaeger.Namespace, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: jaeger.APIVersion, + Kind: jaeger.Kind, + Name: jaeger.Name, + UID: jaeger.UID, + Controller: &trueVar, + }, + }, + }, + Spec: batchv1beta1.CronJobSpec{ + Schedule: jaeger.Spec.Storage.SparkDependencies.Schedule, + JobTemplate: batchv1beta1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Image: jaeger.Spec.Storage.SparkDependencies.Image, + Name: name, + // let spark job use its default values + Env: removeEmptyVars(envVars), + }, + }, + RestartPolicy: v1.RestartPolicyNever, + }, + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + "prometheus.io/scrape": "false", + "sidecar.istio.io/inject": "false", + }, + }, + }, + }, + }, + }, + } +} + +func getStorageEnvs(s v1alpha1.JaegerStorageSpec) []v1.EnvVar { + sFlags := s.Options.Filter(storage.OptionsPrefix(s.Type)) + sFlagsMap := sFlags.Map() + keyspace := sFlagsMap["cassandra.keyspace"] + if keyspace == "" { + keyspace = "jaeger_v1_test" + } + switch s.Type { + case "cassandra": + return []v1.EnvVar{ + {Name: "CASSANDRA_CONTACT_POINTS", Value: sFlagsMap["cassandra.servers"]}, + {Name: "CASSANDRA_KEYSPACE", Value: keyspace}, + {Name: "CASSANDRA_USERNAME", Value: sFlagsMap["cassandra.username"]}, + {Name: "CASSANDRA_PASSWORD", Value: sFlagsMap["cassandra.password"]}, + {Name: "CASSANDRA_USE_SSL", Value: strconv.FormatBool(s.SparkDependencies.CassandraUseSsl)}, + {Name: "CASSANDRA_LOCAL_DC", Value: s.SparkDependencies.CassandraLocalDc}, + {Name: "CASSANDRA_CLIENT_AUTH_ENABLED", Value: strconv.FormatBool(s.SparkDependencies.CassandraClientAuthEnabled)}, + } + case "elasticsearch": + return []v1.EnvVar{ + {Name: "ES_NODES", Value: sFlagsMap["es.server-urls"]}, + {Name: "ES_INDEX_PREFIX", Value: sFlagsMap["es.index-prefix"]}, + {Name: "ES_USERNAME", Value: sFlagsMap["es.username"]}, + {Name: "ES_PASSWORD", Value: sFlagsMap["es.password"]}, + {Name: "ES_CLIENT_NODE_ONLY", Value: strconv.FormatBool(s.SparkDependencies.ElasticsearchClientNodeOnly)}, + {Name: "ES_NODES_WAN_ONLY", Value: strconv.FormatBool(s.SparkDependencies.ElasticsearchNodesWanOnly)}, + } + default: + return nil + } +} + +func applyDefaults(jaeger *v1alpha1.Jaeger) { + if jaeger.Spec.Storage.SparkDependencies.Image == "" { + jaeger.Spec.Storage.SparkDependencies.Image = fmt.Sprintf("%s", viper.GetString("jaeger-spark-dependencies-image")) + } + if jaeger.Spec.Storage.SparkDependencies.Schedule == "" { + jaeger.Spec.Storage.SparkDependencies.Schedule = "55 23 * * *" + } +} + +func removeEmptyVars(envVars []v1.EnvVar) []v1.EnvVar { + var notEmpty []v1.EnvVar + for _, v := range envVars { + if v.Value != "" || v.ValueFrom != nil { + notEmpty = append(notEmpty, v) + } + } + return notEmpty +} diff --git a/pkg/cronjob/spark_dependencies_test.go b/pkg/cronjob/spark_dependencies_test.go new file mode 100644 index 000000000..1a16de85f --- /dev/null +++ b/pkg/cronjob/spark_dependencies_test.go @@ -0,0 +1,84 @@ +package cronjob + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/api/core/v1" + + "github.com/jaegertracing/jaeger-operator/pkg/apis/io/v1alpha1" +) + +func TestApplyDefaults(t *testing.T) { + tests := []struct { + underTest *v1alpha1.Jaeger + expected *v1alpha1.Jaeger + }{ + {underTest: &v1alpha1.Jaeger{}, expected: &v1alpha1.Jaeger{Spec: v1alpha1.JaegerSpec{Storage: v1alpha1.JaegerStorageSpec{ + SparkDependencies: v1alpha1.JaegerDependenciesSpec{Schedule: "55 23 * * *"}}}}}, + {underTest: &v1alpha1.Jaeger{Spec: v1alpha1.JaegerSpec{Storage: v1alpha1.JaegerStorageSpec{ + SparkDependencies: v1alpha1.JaegerDependenciesSpec{Schedule: "foo"}}}}, + expected: &v1alpha1.Jaeger{Spec: v1alpha1.JaegerSpec{Storage: v1alpha1.JaegerStorageSpec{ + SparkDependencies: v1alpha1.JaegerDependenciesSpec{Schedule: "foo"}}}}}, + } + for _, test := range tests { + applyDefaults(test.underTest) + assert.Equal(t, test.expected, test.underTest) + } +} + +func TestRemoveEmptyVars(t *testing.T) { + tests := []struct { + underTest []v1.EnvVar + expected []v1.EnvVar + }{ + {}, + {underTest: []v1.EnvVar{{Name: "foo", Value: "bar"}, {Name: "foo3"}, {Name: "foo2", ValueFrom: &v1.EnvVarSource{}}}, + expected: []v1.EnvVar{{Name: "foo", Value: "bar"}, {Name: "foo2", ValueFrom: &v1.EnvVarSource{}}}}, + {underTest: []v1.EnvVar{{Name: "foo"}}}, + } + for _, test := range tests { + exp := removeEmptyVars(test.underTest) + assert.Equal(t, test.expected, exp) + } +} + +func TestStorageEnvs(t *testing.T) { + tests := []struct { + storage v1alpha1.JaegerStorageSpec + expected []v1.EnvVar + }{ + {storage: v1alpha1.JaegerStorageSpec{Type: "foo"}}, + {storage: v1alpha1.JaegerStorageSpec{Type: "cassandra", + Options: v1alpha1.NewOptions(map[string]interface{}{"cassandra.servers": "lol:hol", "cassandra.keyspace": "haha", + "cassandra.username": "jdoe", "cassandra.password": "none"})}, + expected: []v1.EnvVar{ + {Name: "CASSANDRA_CONTACT_POINTS", Value: "lol:hol"}, + {Name: "CASSANDRA_KEYSPACE", Value: "haha"}, + {Name: "CASSANDRA_USERNAME", Value: "jdoe"}, + {Name: "CASSANDRA_PASSWORD", Value: "none"}, + {Name: "CASSANDRA_USE_SSL", Value: "false"}, + {Name: "CASSANDRA_LOCAL_DC", Value: ""}, + {Name: "CASSANDRA_CLIENT_AUTH_ENABLED", Value: "false"}, + }}, + {storage: v1alpha1.JaegerStorageSpec{Type: "elasticsearch", + Options: v1alpha1.NewOptions(map[string]interface{}{"es.server-urls": "lol:hol", "es.index-prefix": "haha", + "es.username": "jdoe", "es.password": "none"})}, + expected: []v1.EnvVar{ + {Name: "ES_NODES", Value: "lol:hol"}, + {Name: "ES_INDEX_PREFIX", Value: "haha"}, + {Name: "ES_USERNAME", Value: "jdoe"}, + {Name: "ES_PASSWORD", Value: "none"}, + {Name: "ES_CLIENT_NODE_ONLY", Value: "false"}, + {Name: "ES_NODES_WAN_ONLY", Value: "false"}, + }}, + } + for _, test := range tests { + envVars := getStorageEnvs(test.storage) + assert.Equal(t, test.expected, envVars) + } +} + +func TestCreate(t *testing.T) { + assert.NotNil(t, Create(&v1alpha1.Jaeger{Spec: v1alpha1.JaegerSpec{Storage: v1alpha1.JaegerStorageSpec{Type: "elasticsearch"}}})) +} diff --git a/pkg/strategy/all-in-one.go b/pkg/strategy/all-in-one.go index 9b2421f7e..0252cffe4 100644 --- a/pkg/strategy/all-in-one.go +++ b/pkg/strategy/all-in-one.go @@ -12,6 +12,7 @@ import ( "github.com/jaegertracing/jaeger-operator/pkg/apis/io/v1alpha1" "github.com/jaegertracing/jaeger-operator/pkg/config/sampling" "github.com/jaegertracing/jaeger-operator/pkg/config/ui" + "github.com/jaegertracing/jaeger-operator/pkg/cronjob" "github.com/jaegertracing/jaeger-operator/pkg/deployment" "github.com/jaegertracing/jaeger-operator/pkg/ingress" "github.com/jaegertracing/jaeger-operator/pkg/inject" @@ -78,6 +79,14 @@ func (c *allInOneStrategy) Create() []runtime.Object { } } + if cronjob.SupportedStorage(c.jaeger.Spec.Storage.Type) { + if c.jaeger.Spec.Storage.SparkDependencies.Enabled { + os = append(os, cronjob.Create(c.jaeger)) + } else { + logrus.Info("Spark dependencies are disabled - need to be enabled explicitly") + } + } + return os } diff --git a/pkg/strategy/all-in-one_test.go b/pkg/strategy/all-in-one_test.go index 6f7b6178a..d237e7c71 100644 --- a/pkg/strategy/all-in-one_test.go +++ b/pkg/strategy/all-in-one_test.go @@ -3,10 +3,12 @@ package strategy import ( "context" "fmt" + "reflect" "testing" "github.com/spf13/viper" "github.com/stretchr/testify/assert" + batchv1beta1 "k8s.io/api/batch/v1beta1" "k8s.io/apimachinery/pkg/runtime" "github.com/jaegertracing/jaeger-operator/pkg/apis/io/v1alpha1" @@ -131,3 +133,42 @@ func assertDeploymentsAndServicesForAllInOne(t *testing.T, name string, objs []r } assertHasAllObjects(t, name, objs, deployments, daemonsets, services, ingresses, routes, serviceAccounts, configMaps) } + +func TestSparkDependenciesAllInOne(t *testing.T) { + testSparkDependencies(t, func(jaeger *v1alpha1.Jaeger) S { + return &allInOneStrategy{jaeger: jaeger} + }) +} + +func testSparkDependencies(t *testing.T, fce func(jaeger *v1alpha1.Jaeger) S) { + tests := []struct { + jaeger *v1alpha1.Jaeger + sparkCronJobEnabled bool + }{ + {jaeger: &v1alpha1.Jaeger{Spec: v1alpha1.JaegerSpec{ + Storage: v1alpha1.JaegerStorageSpec{Type: "elasticsearch", + SparkDependencies: v1alpha1.JaegerDependenciesSpec{Enabled: true}}, + }}, sparkCronJobEnabled: true}, + {jaeger: &v1alpha1.Jaeger{Spec: v1alpha1.JaegerSpec{ + Storage: v1alpha1.JaegerStorageSpec{Type: "cassandra", + SparkDependencies: v1alpha1.JaegerDependenciesSpec{Enabled: true}}, + }}, sparkCronJobEnabled: true}, + {jaeger: &v1alpha1.Jaeger{Spec: v1alpha1.JaegerSpec{ + Storage: v1alpha1.JaegerStorageSpec{Type: "kafka", + SparkDependencies: v1alpha1.JaegerDependenciesSpec{Enabled: true}}, + }}, sparkCronJobEnabled: false}, + {jaeger: &v1alpha1.Jaeger{Spec: v1alpha1.JaegerSpec{ + Storage: v1alpha1.JaegerStorageSpec{Type: "elasticsearch"}, + }}, sparkCronJobEnabled: false}, + } + for _, test := range tests { + s := fce(test.jaeger) + objs := s.Create() + cronJobs := getTypesOf(objs, reflect.TypeOf(&batchv1beta1.CronJob{})) + if test.sparkCronJobEnabled { + assert.Equal(t, 1, len(cronJobs)) + } else { + assert.Equal(t, 0, len(cronJobs)) + } + } +} diff --git a/pkg/strategy/controller_test.go b/pkg/strategy/controller_test.go index 53ad21ee0..57da3c1c7 100644 --- a/pkg/strategy/controller_test.go +++ b/pkg/strategy/controller_test.go @@ -2,6 +2,7 @@ package strategy import ( "context" + "reflect" "testing" osv1 "github.com/openshift/api/route/v1" @@ -175,7 +176,6 @@ func getDeployments(objs []runtime.Object) []*appsv1.Deployment { deps = append(deps, obj.(*appsv1.Deployment)) } } - return deps } @@ -229,3 +229,16 @@ func assertHasAllObjects(t *testing.T, name string, objs []runtime.Object, deplo assert.True(t, v, "Expected %s to have been returned from the list of config maps", k) } } + +func getTypesOf( + objs []runtime.Object, + typ reflect.Type, +) []runtime.Object { + var theTypes []runtime.Object + for _, obj := range objs { + if typ == reflect.TypeOf(obj) { + theTypes = append(theTypes, obj) + } + } + return theTypes +} diff --git a/pkg/strategy/production.go b/pkg/strategy/production.go index f7f876247..3421e7f05 100644 --- a/pkg/strategy/production.go +++ b/pkg/strategy/production.go @@ -12,6 +12,7 @@ import ( "github.com/jaegertracing/jaeger-operator/pkg/apis/io/v1alpha1" "github.com/jaegertracing/jaeger-operator/pkg/config/sampling" "github.com/jaegertracing/jaeger-operator/pkg/config/ui" + "github.com/jaegertracing/jaeger-operator/pkg/cronjob" "github.com/jaegertracing/jaeger-operator/pkg/deployment" "github.com/jaegertracing/jaeger-operator/pkg/ingress" "github.com/jaegertracing/jaeger-operator/pkg/inject" @@ -84,6 +85,14 @@ func (c *productionStrategy) Create() []runtime.Object { } } + if cronjob.SupportedStorage(c.jaeger.Spec.Storage.Type) { + if c.jaeger.Spec.Storage.SparkDependencies.Enabled { + os = append(os, cronjob.Create(c.jaeger)) + } else { + logrus.Info("Do not installing spark dependencies - need to be enabled explicitly") + } + } + return os } diff --git a/pkg/strategy/production_test.go b/pkg/strategy/production_test.go index d477caba3..5f5e5e7bf 100644 --- a/pkg/strategy/production_test.go +++ b/pkg/strategy/production_test.go @@ -172,3 +172,9 @@ func assertDeploymentsAndServicesForProduction(t *testing.T, name string, objs [ } assertHasAllObjects(t, name, objs, deployments, daemonsets, services, ingresses, routes, serviceAccounts, configMaps) } + +func TestSparkDependenciesProduction(t *testing.T) { + testSparkDependencies(t, func(jaeger *v1alpha1.Jaeger) S { + return &productionStrategy{jaeger: jaeger} + }) +} diff --git a/test/e2e/jaeger_test.go b/test/e2e/jaeger_test.go index 336855d91..5644002f9 100644 --- a/test/e2e/jaeger_test.go +++ b/test/e2e/jaeger_test.go @@ -40,6 +40,8 @@ func TestJaeger(t *testing.T) { t.Run("daemonset", DaemonSet) t.Run("sidecar", Sidecar) t.Run("cassandra", Cassandra) + t.Run("spark-dependencies-es", SparkDependenciesElasticsearch) + t.Run("spark-dependencies-cass", SparkDependenciesCassandra) }) } diff --git a/test/e2e/spark_dependencies_test.go b/test/e2e/spark_dependencies_test.go new file mode 100644 index 000000000..f4f355cb2 --- /dev/null +++ b/test/e2e/spark_dependencies_test.go @@ -0,0 +1,89 @@ +package e2e + +import ( + "context" + "fmt" + "testing" + + framework "github.com/operator-framework/operator-sdk/pkg/test" + "github.com/operator-framework/operator-sdk/pkg/test/e2eutil" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/jaegertracing/jaeger-operator/pkg/apis/io/v1alpha1" +) + +func SparkDependenciesElasticsearch(t *testing.T) { + testCtx := prepare(t) + defer testCtx.Cleanup() + storage := v1alpha1.JaegerStorageSpec{ + Type: "elasticsearch", + Options: v1alpha1.NewOptions(map[string]interface{}{ + "es.server-urls": "http://elasticsearch.default.svc:9200", + "es.username": "elastic", + "es.password": "changeme"}), + } + if err := sparkTest(t, framework.Global, testCtx, storage); err != nil { + t.Fatal(err) + } +} + +func SparkDependenciesCassandra(t *testing.T) { + testCtx := prepare(t) + defer testCtx.Cleanup() + + storage := v1alpha1.JaegerStorageSpec{ + Type: "cassandra", + Options: v1alpha1.NewOptions(map[string]interface{}{"cassandra.servers": "cassandra.default.svc", "cassandra.keyspace": "jaeger_v1_datacenter1"}), + CassandraCreateSchema:v1alpha1.JaegerCassandraCreateSchemaSpec{Datacenter:"datacenter1", Mode: "prod"}, + } + if err := sparkTest(t, framework.Global, testCtx, storage); err != nil { + t.Fatal(err) + } +} + +func sparkTest(t *testing.T, f *framework.Framework, testCtx *framework.TestCtx, storage v1alpha1.JaegerStorageSpec) error { + namespace, err := testCtx.GetNamespace() + if err != nil { + return fmt.Errorf("could not get namespace: %v", err) + } + + storage.SparkDependencies = v1alpha1.JaegerDependenciesSpec{ + Enabled: true, + // run immediately + Schedule: "*/1 * * * *", + } + + name := "test-spark-deps" + j := &v1alpha1.Jaeger{ + TypeMeta: metav1.TypeMeta{ + Kind: "Jaeger", + APIVersion: "io.jaegertracing/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: v1alpha1.JaegerSpec{ + Strategy: "allInOne", + AllInOne: v1alpha1.JaegerAllInOneSpec{}, + Storage: storage, + }, + } + + err = f.Client.Create(context.Background(), j, &framework.CleanupOptions{TestContext: testCtx, Timeout: timeout, RetryInterval: retryInterval}) + if err != nil { + return err + } + + err = WaitForCronJob(t, f.KubeClient, namespace, fmt.Sprintf("%s-spark-dependencies", name), retryInterval, timeout) + if err != nil { + return err + } + + err = WaitForJobOfAnOwner(t, f.KubeClient, namespace, fmt.Sprintf("%s-spark-dependencies", name), retryInterval, timeout) + if err != nil { + return err + } + + return e2eutil.WaitForDeployment(t, f.KubeClient, namespace, name, 1, retryInterval, timeout) +} diff --git a/test/e2e/wait_util.go b/test/e2e/wait_util.go index 39d286c8a..fafe9b007 100644 --- a/test/e2e/wait_util.go +++ b/test/e2e/wait_util.go @@ -113,3 +113,57 @@ func WaitForJob(t *testing.T, kubeclient kubernetes.Interface, namespace, name s t.Logf("Jobs succeeded\n") return nil } + +// WaitForJobOfAnOwner checks to see if a given job has completed successfully +// See #WaitForDeployment for the full semantics +func WaitForJobOfAnOwner(t *testing.T, kubeclient kubernetes.Interface, namespace, ownerName string, retryInterval, timeout time.Duration) error { + err := wait.Poll(retryInterval, timeout, func() (done bool, err error) { + jobList, err := kubeclient.BatchV1().Jobs(namespace).List(metav1.ListOptions{IncludeUninitialized: true}) + if err != nil { + if apierrors.IsNotFound(err) { + t.Logf("Waiting for availability of %s job owner\n", ownerName) + return false, nil + } + return false, err + } + for _, j := range jobList.Items { + for _, r := range j.OwnerReferences { + if ownerName == r.Name && j.Status.Succeeded > 0 && j.Status.Failed == 0 && j.Status.Active == 0 { + return true, nil + } + } + } + t.Logf("Waiting for job of owner %s to succeed.", ownerName) + return false, nil + }) + if err != nil { + return err + } + t.Logf("Jobs succeeded\n") + return nil +} + +// WaitForCronJob checks to see if a given cron job scheduled a job +// See #WaitForDeployment for the full semantics +func WaitForCronJob(t *testing.T, kubeclient kubernetes.Interface, namespace, name string, retryInterval, timeout time.Duration) error { + err := wait.Poll(retryInterval, timeout, func() (done bool, err error) { + cronJob, err := kubeclient.BatchV1beta1().CronJobs(namespace).Get(name, metav1.GetOptions{IncludeUninitialized: true}) + if err != nil { + if apierrors.IsNotFound(err) { + t.Logf("Waiting for availability of %s cronjob\n", name) + return false, nil + } + return false, err + } + if cronJob.Status.LastScheduleTime != nil { + return true, nil + } + t.Logf("Waiting for conjob %s to have scheduled", name) + return false, nil + }) + if err != nil { + return err + } + t.Logf("CronJob succeeded\n") + return nil +}