-
Notifications
You must be signed in to change notification settings - Fork 344
/
spark_dependencies.go
125 lines (116 loc) · 3.87 KB
/
spark_dependencies.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package cronjob
import (
"fmt"
"github.com/jaegertracing/jaeger-operator/pkg/apis/io/v1alpha1"
"github.com/jaegertracing/jaeger-operator/pkg/storage"
"github.com/spf13/viper"
batchv1 "k8s.io/api/batch/v1"
batchv1beta1 "k8s.io/api/batch/v1beta1"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"strconv"
)
var supportedStorageTypes = map[string]bool{"elasticsearch": true, "cassandra": true}
func SupportedStorage(storage string) bool {
return supportedStorageTypes[storage]
}
func Create(jaeger *v1alpha1.Jaeger) *batchv1beta1.CronJob {
applyDefaults(jaeger)
envVars := []v1.EnvVar{
{Name: "STORAGE", Value: jaeger.Spec.Storage.Type},
{Name: "SPARK_MASTER", Value: jaeger.Spec.SparkDependencies.SparkMaster},
{Name: "JAVA_OPTS", Value: jaeger.Spec.SparkDependencies.JavaOpts},
}
envVars = append(envVars, getStorageEnvs(jaeger.Spec.Storage, jaeger.Spec.SparkDependencies)...)
trueVar := true
name := fmt.Sprintf("%s-spark-dependencies", jaeger.Name)
return &batchv1beta1.CronJob{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: jaeger.Namespace,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: jaeger.APIVersion,
Kind: jaeger.Kind,
Name: jaeger.Name,
UID: jaeger.UID,
Controller: &trueVar,
},
},
},
Spec: batchv1beta1.CronJobSpec{
Schedule: jaeger.Spec.SparkDependencies.Schedule,
JobTemplate: batchv1beta1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Image: jaeger.Spec.SparkDependencies.Image,
Name: name,
// let spark job use its default values
Env: removeEmptyVars(envVars),
},
},
RestartPolicy: v1.RestartPolicyNever,
},
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
"prometheus.io/scrape": "false",
"sidecar.istio.io/inject": "false",
},
},
},
},
},
},
}
}
func getStorageEnvs(s v1alpha1.JaegerStorageSpec, sj v1alpha1.JaegerSparkDependenciesSpec) []v1.EnvVar {
sFlags := s.Options.Filter(storage.OptionsPrefix(s.Type))
sFlagsMap := sFlags.Map()
keyspace := sFlagsMap["cassandra.keyspace"]
if keyspace == "" {
keyspace = "jaeger_v1_test"
}
switch s.Type {
case "cassandra":
return []v1.EnvVar{
{Name: "CASSANDRA_CONTACT_POINTS", Value: sFlagsMap["cassandra.servers"]},
{Name: "CASSANDRA_KEYSPACE", Value: keyspace},
{Name: "CASSANDRA_USERNAME", Value: sFlagsMap["cassandra.username"]},
{Name: "CASSANDRA_PASSWORD", Value: sFlagsMap["cassandra.password"]},
{Name: "CASSANDRA_USE_SSL", Value: strconv.FormatBool(sj.CassandraUseSsl)},
{Name: "CASSANDRA_LOCAL_DC", Value: sj.CassandraLocalDc},
{Name: "CASSANDRA_CLIENT_AUTH_ENABLED", Value: strconv.FormatBool(sj.CassandraClientAuthEnabled)},
}
case "elasticsearch":
return []v1.EnvVar{
{Name: "ES_NODES", Value: sFlagsMap["es.server-urls"]},
{Name: "ES_INDEX_PREFIX", Value: sFlagsMap["es.index-prefix"]},
{Name: "ES_USERNAME", Value: sFlagsMap["es.username"]},
{Name: "ES_PASSWORD", Value: sFlagsMap["es.password"]},
{Name: "ES_CLIENT_NODE_ONLY", Value: strconv.FormatBool(sj.ElasticsearchClientNodeOnly)},
{Name: "ES_NODES_WAN_ONLY", Value: strconv.FormatBool(sj.ElasticsearchNodesWanOnly)},
}
default:
return nil
}
}
func applyDefaults(jaeger *v1alpha1.Jaeger) {
if jaeger.Spec.SparkDependencies.Image == "" {
jaeger.Spec.SparkDependencies.Image = fmt.Sprintf("%s", viper.GetString("jaeger-spark-dependencies-image"))
}
if jaeger.Spec.SparkDependencies.Schedule == "" {
jaeger.Spec.SparkDependencies.Schedule = "55 23 * * *"
}
}
func removeEmptyVars(envVars []v1.EnvVar) []v1.EnvVar {
var notEmpty []v1.EnvVar
for _, v := range envVars {
if v.Value != "" || v.ValueFrom != nil {
notEmpty = append(notEmpty, v)
}
}
return notEmpty
}