Skip to content

Commit

Permalink
Add spark job (#140)
Browse files Browse the repository at this point in the history
* Add spark job

Signed-off-by: Pavol Loffay <[email protected]>

* Add generate

Signed-off-by: Pavol Loffay <[email protected]>

* Increase coverage

Signed-off-by: Pavol Loffay <[email protected]>

* Fix some review comments

Signed-off-by: Pavol Loffay <[email protected]>

* Fix format

Signed-off-by: Pavol Loffay <[email protected]>

* Fix review comments

Signed-off-by: Pavol Loffay <[email protected]>

* Use production in prod example

Signed-off-by: Pavol Loffay <[email protected]>

* Rename sparkDependencies to dependencies

Signed-off-by: Pavol Loffay <[email protected]>

* Format code

Signed-off-by: Pavol Loffay <[email protected]>
  • Loading branch information
pavolloffay authored Dec 4, 2018
1 parent 610d415 commit 3aa97dd
Show file tree
Hide file tree
Showing 15 changed files with 473 additions and 1 deletion.
2 changes: 2 additions & 0 deletions deploy/examples/simple-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@ spec:
server-urls: http://elasticsearch:9200
username: elastic
password: changeme
dependencies:
enabled: true
1 change: 1 addition & 0 deletions deploy/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,6 @@ rules:
- batch
resources:
- jobs
- cronjobs
verbs:
- "*"
15 changes: 15 additions & 0 deletions pkg/apis/io/v1alpha1/jaeger_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ type JaegerStorageSpec struct {
SecretName string `json:"secretName"`
Options Options `json:"options"`
CassandraCreateSchema JaegerCassandraCreateSchemaSpec `json:"cassandraCreateSchema"`
SparkDependencies JaegerDependenciesSpec `json:"dependencies"`
}

// JaegerCassandraCreateSchemaSpec holds the options related to the create-schema batch job
Expand All @@ -135,6 +136,20 @@ type JaegerCassandraCreateSchemaSpec struct {
Mode string `json:"mode"`
}

// JaegerDependenciesSpec defined options for running spark-dependencies.
type JaegerDependenciesSpec struct {
Enabled bool `json:"enabled"`
SparkMaster string `json:"sparkMaster"`
Schedule string `json:"schedule"`
Image string `json:"image"`
JavaOpts string `json:"javaOpts"`
CassandraUseSsl bool `json:"cassandraUseSsl"`
CassandraLocalDc string `json:"cassandraLocalDc"`
CassandraClientAuthEnabled bool `json:"cassandraClientAuthEnabled"`
ElasticsearchClientNodeOnly bool `json:"elasticsearchClientNodeOnly"`
ElasticsearchNodesWanOnly bool `json:"elasticsearchNodesWanOnly"`
}

func init() {
SchemeBuilder.Register(&Jaeger{}, &JaegerList{})
}
17 changes: 17 additions & 0 deletions pkg/apis/io/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions pkg/cmd/start/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ func NewStartCommand() *cobra.Command {
cmd.Flags().String("jaeger-cassandra-schema-image", "jaegertracing/jaeger-cassandra-schema", "The Docker image for the Jaeger Cassandra Schema")
viper.BindPFlag("jaeger-cassandra-schema-image", cmd.Flags().Lookup("jaeger-cassandra-schema-image"))

cmd.Flags().String("jaeger-spark-dependencies-image", "jaegertracing/spark-dependencies", "The Docker image for the Spark Dependencies Job")
viper.BindPFlag("jaeger-spark-dependencies-image", cmd.Flags().Lookup("jaeger-spark-dependencies-image"))

cmd.Flags().String("openshift-oauth-proxy-image", "openshift/oauth-proxy:latest", "The Docker image location definition for the OpenShift OAuth Proxy")
viper.BindPFlag("openshift-oauth-proxy-image", cmd.Flags().Lookup("openshift-oauth-proxy-image"))

Expand Down
127 changes: 127 additions & 0 deletions pkg/cronjob/spark_dependencies.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package cronjob

import (
"fmt"
"strconv"

"github.com/spf13/viper"
batchv1 "k8s.io/api/batch/v1"
batchv1beta1 "k8s.io/api/batch/v1beta1"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/jaegertracing/jaeger-operator/pkg/apis/io/v1alpha1"
"github.com/jaegertracing/jaeger-operator/pkg/storage"
)

var supportedStorageTypes = map[string]bool{"elasticsearch": true, "cassandra": true}

func SupportedStorage(storage string) bool {
return supportedStorageTypes[storage]
}

func Create(jaeger *v1alpha1.Jaeger) *batchv1beta1.CronJob {
applyDefaults(jaeger)

envVars := []v1.EnvVar{
{Name: "STORAGE", Value: jaeger.Spec.Storage.Type},
{Name: "SPARK_MASTER", Value: jaeger.Spec.Storage.SparkDependencies.SparkMaster},
{Name: "JAVA_OPTS", Value: jaeger.Spec.Storage.SparkDependencies.JavaOpts},
}
envVars = append(envVars, getStorageEnvs(jaeger.Spec.Storage)...)

trueVar := true
name := fmt.Sprintf("%s-spark-dependencies", jaeger.Name)
return &batchv1beta1.CronJob{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: jaeger.Namespace,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: jaeger.APIVersion,
Kind: jaeger.Kind,
Name: jaeger.Name,
UID: jaeger.UID,
Controller: &trueVar,
},
},
},
Spec: batchv1beta1.CronJobSpec{
Schedule: jaeger.Spec.Storage.SparkDependencies.Schedule,
JobTemplate: batchv1beta1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Image: jaeger.Spec.Storage.SparkDependencies.Image,
Name: name,
// let spark job use its default values
Env: removeEmptyVars(envVars),
},
},
RestartPolicy: v1.RestartPolicyNever,
},
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
"prometheus.io/scrape": "false",
"sidecar.istio.io/inject": "false",
},
},
},
},
},
},
}
}

func getStorageEnvs(s v1alpha1.JaegerStorageSpec) []v1.EnvVar {
sFlags := s.Options.Filter(storage.OptionsPrefix(s.Type))
sFlagsMap := sFlags.Map()
keyspace := sFlagsMap["cassandra.keyspace"]
if keyspace == "" {
keyspace = "jaeger_v1_test"
}
switch s.Type {
case "cassandra":
return []v1.EnvVar{
{Name: "CASSANDRA_CONTACT_POINTS", Value: sFlagsMap["cassandra.servers"]},
{Name: "CASSANDRA_KEYSPACE", Value: keyspace},
{Name: "CASSANDRA_USERNAME", Value: sFlagsMap["cassandra.username"]},
{Name: "CASSANDRA_PASSWORD", Value: sFlagsMap["cassandra.password"]},
{Name: "CASSANDRA_USE_SSL", Value: strconv.FormatBool(s.SparkDependencies.CassandraUseSsl)},
{Name: "CASSANDRA_LOCAL_DC", Value: s.SparkDependencies.CassandraLocalDc},
{Name: "CASSANDRA_CLIENT_AUTH_ENABLED", Value: strconv.FormatBool(s.SparkDependencies.CassandraClientAuthEnabled)},
}
case "elasticsearch":
return []v1.EnvVar{
{Name: "ES_NODES", Value: sFlagsMap["es.server-urls"]},
{Name: "ES_INDEX_PREFIX", Value: sFlagsMap["es.index-prefix"]},
{Name: "ES_USERNAME", Value: sFlagsMap["es.username"]},
{Name: "ES_PASSWORD", Value: sFlagsMap["es.password"]},
{Name: "ES_CLIENT_NODE_ONLY", Value: strconv.FormatBool(s.SparkDependencies.ElasticsearchClientNodeOnly)},
{Name: "ES_NODES_WAN_ONLY", Value: strconv.FormatBool(s.SparkDependencies.ElasticsearchNodesWanOnly)},
}
default:
return nil
}
}

func applyDefaults(jaeger *v1alpha1.Jaeger) {
if jaeger.Spec.Storage.SparkDependencies.Image == "" {
jaeger.Spec.Storage.SparkDependencies.Image = fmt.Sprintf("%s", viper.GetString("jaeger-spark-dependencies-image"))
}
if jaeger.Spec.Storage.SparkDependencies.Schedule == "" {
jaeger.Spec.Storage.SparkDependencies.Schedule = "55 23 * * *"
}
}

func removeEmptyVars(envVars []v1.EnvVar) []v1.EnvVar {
var notEmpty []v1.EnvVar
for _, v := range envVars {
if v.Value != "" || v.ValueFrom != nil {
notEmpty = append(notEmpty, v)
}
}
return notEmpty
}
84 changes: 84 additions & 0 deletions pkg/cronjob/spark_dependencies_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package cronjob

import (
"testing"

"github.com/stretchr/testify/assert"
"k8s.io/api/core/v1"

"github.com/jaegertracing/jaeger-operator/pkg/apis/io/v1alpha1"
)

func TestApplyDefaults(t *testing.T) {
tests := []struct {
underTest *v1alpha1.Jaeger
expected *v1alpha1.Jaeger
}{
{underTest: &v1alpha1.Jaeger{}, expected: &v1alpha1.Jaeger{Spec: v1alpha1.JaegerSpec{Storage: v1alpha1.JaegerStorageSpec{
SparkDependencies: v1alpha1.JaegerDependenciesSpec{Schedule: "55 23 * * *"}}}}},
{underTest: &v1alpha1.Jaeger{Spec: v1alpha1.JaegerSpec{Storage: v1alpha1.JaegerStorageSpec{
SparkDependencies: v1alpha1.JaegerDependenciesSpec{Schedule: "foo"}}}},
expected: &v1alpha1.Jaeger{Spec: v1alpha1.JaegerSpec{Storage: v1alpha1.JaegerStorageSpec{
SparkDependencies: v1alpha1.JaegerDependenciesSpec{Schedule: "foo"}}}}},
}
for _, test := range tests {
applyDefaults(test.underTest)
assert.Equal(t, test.expected, test.underTest)
}
}

func TestRemoveEmptyVars(t *testing.T) {
tests := []struct {
underTest []v1.EnvVar
expected []v1.EnvVar
}{
{},
{underTest: []v1.EnvVar{{Name: "foo", Value: "bar"}, {Name: "foo3"}, {Name: "foo2", ValueFrom: &v1.EnvVarSource{}}},
expected: []v1.EnvVar{{Name: "foo", Value: "bar"}, {Name: "foo2", ValueFrom: &v1.EnvVarSource{}}}},
{underTest: []v1.EnvVar{{Name: "foo"}}},
}
for _, test := range tests {
exp := removeEmptyVars(test.underTest)
assert.Equal(t, test.expected, exp)
}
}

func TestStorageEnvs(t *testing.T) {
tests := []struct {
storage v1alpha1.JaegerStorageSpec
expected []v1.EnvVar
}{
{storage: v1alpha1.JaegerStorageSpec{Type: "foo"}},
{storage: v1alpha1.JaegerStorageSpec{Type: "cassandra",
Options: v1alpha1.NewOptions(map[string]interface{}{"cassandra.servers": "lol:hol", "cassandra.keyspace": "haha",
"cassandra.username": "jdoe", "cassandra.password": "none"})},
expected: []v1.EnvVar{
{Name: "CASSANDRA_CONTACT_POINTS", Value: "lol:hol"},
{Name: "CASSANDRA_KEYSPACE", Value: "haha"},
{Name: "CASSANDRA_USERNAME", Value: "jdoe"},
{Name: "CASSANDRA_PASSWORD", Value: "none"},
{Name: "CASSANDRA_USE_SSL", Value: "false"},
{Name: "CASSANDRA_LOCAL_DC", Value: ""},
{Name: "CASSANDRA_CLIENT_AUTH_ENABLED", Value: "false"},
}},
{storage: v1alpha1.JaegerStorageSpec{Type: "elasticsearch",
Options: v1alpha1.NewOptions(map[string]interface{}{"es.server-urls": "lol:hol", "es.index-prefix": "haha",
"es.username": "jdoe", "es.password": "none"})},
expected: []v1.EnvVar{
{Name: "ES_NODES", Value: "lol:hol"},
{Name: "ES_INDEX_PREFIX", Value: "haha"},
{Name: "ES_USERNAME", Value: "jdoe"},
{Name: "ES_PASSWORD", Value: "none"},
{Name: "ES_CLIENT_NODE_ONLY", Value: "false"},
{Name: "ES_NODES_WAN_ONLY", Value: "false"},
}},
}
for _, test := range tests {
envVars := getStorageEnvs(test.storage)
assert.Equal(t, test.expected, envVars)
}
}

func TestCreate(t *testing.T) {
assert.NotNil(t, Create(&v1alpha1.Jaeger{Spec: v1alpha1.JaegerSpec{Storage: v1alpha1.JaegerStorageSpec{Type: "elasticsearch"}}}))
}
9 changes: 9 additions & 0 deletions pkg/strategy/all-in-one.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/jaegertracing/jaeger-operator/pkg/apis/io/v1alpha1"
"github.com/jaegertracing/jaeger-operator/pkg/config/sampling"
"github.com/jaegertracing/jaeger-operator/pkg/config/ui"
"github.com/jaegertracing/jaeger-operator/pkg/cronjob"
"github.com/jaegertracing/jaeger-operator/pkg/deployment"
"github.com/jaegertracing/jaeger-operator/pkg/ingress"
"github.com/jaegertracing/jaeger-operator/pkg/inject"
Expand Down Expand Up @@ -78,6 +79,14 @@ func (c *allInOneStrategy) Create() []runtime.Object {
}
}

if cronjob.SupportedStorage(c.jaeger.Spec.Storage.Type) {
if c.jaeger.Spec.Storage.SparkDependencies.Enabled {
os = append(os, cronjob.Create(c.jaeger))
} else {
logrus.Info("Spark dependencies are disabled - need to be enabled explicitly")
}
}

return os
}

Expand Down
41 changes: 41 additions & 0 deletions pkg/strategy/all-in-one_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package strategy
import (
"context"
"fmt"
"reflect"
"testing"

"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
batchv1beta1 "k8s.io/api/batch/v1beta1"
"k8s.io/apimachinery/pkg/runtime"

"github.com/jaegertracing/jaeger-operator/pkg/apis/io/v1alpha1"
Expand Down Expand Up @@ -131,3 +133,42 @@ func assertDeploymentsAndServicesForAllInOne(t *testing.T, name string, objs []r
}
assertHasAllObjects(t, name, objs, deployments, daemonsets, services, ingresses, routes, serviceAccounts, configMaps)
}

func TestSparkDependenciesAllInOne(t *testing.T) {
testSparkDependencies(t, func(jaeger *v1alpha1.Jaeger) S {
return &allInOneStrategy{jaeger: jaeger}
})
}

func testSparkDependencies(t *testing.T, fce func(jaeger *v1alpha1.Jaeger) S) {
tests := []struct {
jaeger *v1alpha1.Jaeger
sparkCronJobEnabled bool
}{
{jaeger: &v1alpha1.Jaeger{Spec: v1alpha1.JaegerSpec{
Storage: v1alpha1.JaegerStorageSpec{Type: "elasticsearch",
SparkDependencies: v1alpha1.JaegerDependenciesSpec{Enabled: true}},
}}, sparkCronJobEnabled: true},
{jaeger: &v1alpha1.Jaeger{Spec: v1alpha1.JaegerSpec{
Storage: v1alpha1.JaegerStorageSpec{Type: "cassandra",
SparkDependencies: v1alpha1.JaegerDependenciesSpec{Enabled: true}},
}}, sparkCronJobEnabled: true},
{jaeger: &v1alpha1.Jaeger{Spec: v1alpha1.JaegerSpec{
Storage: v1alpha1.JaegerStorageSpec{Type: "kafka",
SparkDependencies: v1alpha1.JaegerDependenciesSpec{Enabled: true}},
}}, sparkCronJobEnabled: false},
{jaeger: &v1alpha1.Jaeger{Spec: v1alpha1.JaegerSpec{
Storage: v1alpha1.JaegerStorageSpec{Type: "elasticsearch"},
}}, sparkCronJobEnabled: false},
}
for _, test := range tests {
s := fce(test.jaeger)
objs := s.Create()
cronJobs := getTypesOf(objs, reflect.TypeOf(&batchv1beta1.CronJob{}))
if test.sparkCronJobEnabled {
assert.Equal(t, 1, len(cronJobs))
} else {
assert.Equal(t, 0, len(cronJobs))
}
}
}
Loading

0 comments on commit 3aa97dd

Please sign in to comment.