From 9c9146a333d3ba4b80ec61d96f633f06cf5555fe Mon Sep 17 00:00:00 2001 From: Gary Brown Date: Thu, 13 Dec 2018 17:17:06 +0000 Subject: [PATCH 01/14] Add ingester (and kafka) support Signed-off-by: Gary Brown --- pkg/apis/io/v1alpha1/jaeger_types.go | 9 + pkg/apis/io/v1alpha1/zz_generated.deepcopy.go | 19 + pkg/cmd/start/main.go | 3 + pkg/deployment/collector.go | 11 +- pkg/deployment/collector_test.go | 72 ++++ pkg/deployment/ingester.go | 159 +++++++++ pkg/deployment/ingester_test.go | 326 ++++++++++++++++++ pkg/service/ingester.go | 56 +++ pkg/service/ingester_test.go | 44 +++ pkg/strategy/production.go | 10 + 10 files changed, 707 insertions(+), 2 deletions(-) create mode 100644 pkg/deployment/ingester.go create mode 100644 pkg/deployment/ingester_test.go create mode 100644 pkg/service/ingester.go create mode 100644 pkg/service/ingester_test.go diff --git a/pkg/apis/io/v1alpha1/jaeger_types.go b/pkg/apis/io/v1alpha1/jaeger_types.go index 67910cf23..a0839ab1d 100644 --- a/pkg/apis/io/v1alpha1/jaeger_types.go +++ b/pkg/apis/io/v1alpha1/jaeger_types.go @@ -50,6 +50,7 @@ type JaegerSpec struct { AllInOne JaegerAllInOneSpec `json:"allInOne"` Query JaegerQuerySpec `json:"query"` Collector JaegerCollectorSpec `json:"collector"` + Ingester JaegerIngesterSpec `json:"ingester"` Agent JaegerAgentSpec `json:"agent"` UI JaegerUISpec `json:"ui"` Sampling JaegerSamplingSpec `json:"sampling"` @@ -111,6 +112,14 @@ type JaegerCollectorSpec struct { JaegerCommonSpec } +// JaegerIngesterSpec defines the options to be used when deploying the ingester +type JaegerIngesterSpec struct { + Size int `json:"size"` + Image string `json:"image"` + Options Options `json:"options"` + JaegerCommonSpec +} + // JaegerAgentSpec defines the options to be used when deploying the agent type JaegerAgentSpec struct { Strategy string `json:"strategy"` // can be either 'DaemonSet' or 'Sidecar' (default) diff --git a/pkg/apis/io/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/io/v1alpha1/zz_generated.deepcopy.go index 4a6d4e44e..c5ea5e67c 100644 --- a/pkg/apis/io/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/io/v1alpha1/zz_generated.deepcopy.go @@ -229,6 +229,24 @@ func (in *JaegerEsIndexCleanerSpec) DeepCopy() *JaegerEsIndexCleanerSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *JaegerIngesterSpec) DeepCopyInto(out *JaegerIngesterSpec) { + *out = *in + in.Options.DeepCopyInto(&out.Options) + in.JaegerCommonSpec.DeepCopyInto(&out.JaegerCommonSpec) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new JaegerIngesterSpec. +func (in *JaegerIngesterSpec) DeepCopy() *JaegerIngesterSpec { + if in == nil { + return nil + } + out := new(JaegerIngesterSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *JaegerIngressSpec) DeepCopyInto(out *JaegerIngressSpec) { *out = *in @@ -325,6 +343,7 @@ func (in *JaegerSpec) DeepCopyInto(out *JaegerSpec) { in.AllInOne.DeepCopyInto(&out.AllInOne) in.Query.DeepCopyInto(&out.Query) in.Collector.DeepCopyInto(&out.Collector) + in.Ingester.DeepCopyInto(&out.Ingester) in.Agent.DeepCopyInto(&out.Agent) in.UI.DeepCopyInto(&out.UI) in.Sampling.DeepCopyInto(&out.Sampling) diff --git a/pkg/cmd/start/main.go b/pkg/cmd/start/main.go index 783b50514..e3518386a 100644 --- a/pkg/cmd/start/main.go +++ b/pkg/cmd/start/main.go @@ -39,6 +39,9 @@ func NewStartCommand() *cobra.Command { cmd.Flags().String("jaeger-collector-image", "jaegertracing/jaeger-collector", "The Docker image for the Jaeger Collector") viper.BindPFlag("jaeger-collector-image", cmd.Flags().Lookup("jaeger-collector-image")) + cmd.Flags().String("jaeger-ingester-image", "jaegertracing/jaeger-ingester", "The Docker image for the Jaeger Ingester") + viper.BindPFlag("jaeger-ingester-image", cmd.Flags().Lookup("jaeger-ingester-image")) + cmd.Flags().String("jaeger-all-in-one-image", "jaegertracing/all-in-one", "The Docker image for the Jaeger all-in-one") viper.BindPFlag("jaeger-all-in-one-image", cmd.Flags().Lookup("jaeger-all-in-one-image")) diff --git a/pkg/deployment/collector.go b/pkg/deployment/collector.go index e17767d1a..8d4e8da44 100644 --- a/pkg/deployment/collector.go +++ b/pkg/deployment/collector.go @@ -64,8 +64,15 @@ func (c *Collector) Get() *appsv1.Deployment { }) } + storageType := c.jaeger.Spec.Storage.Type + // If ingester options have been defined, then change storage type + // to Kafka, and the storage options will be used in the Ingester instead + if len(c.jaeger.Spec.Ingester.Options.Map()) > 0 { + storageType = "kafka" + } options := allArgs(c.jaeger.Spec.Collector.Options, - c.jaeger.Spec.Storage.Options.Filter(storage.OptionsPrefix(c.jaeger.Spec.Storage.Type))) + c.jaeger.Spec.Storage.Options.Filter(storage.OptionsPrefix(storageType)), + c.jaeger.Spec.Ingester.Options.Filter(storage.OptionsPrefix(storageType))) sampling.Update(c.jaeger, commonSpec, &options) @@ -105,7 +112,7 @@ func (c *Collector) Get() *appsv1.Deployment { Env: []v1.EnvVar{ v1.EnvVar{ Name: "SPAN_STORAGE_TYPE", - Value: c.jaeger.Spec.Storage.Type, + Value: storageType, }, v1.EnvVar{ Name: "COLLECTOR_ZIPKIN_HTTP_PORT", diff --git a/pkg/deployment/collector_test.go b/pkg/deployment/collector_test.go index bab0fb389..734bb29b8 100644 --- a/pkg/deployment/collector_test.go +++ b/pkg/deployment/collector_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/assert" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/jaegertracing/jaeger-operator/pkg/apis/io/v1alpha1" ) @@ -276,3 +277,74 @@ func TestCollectorLabels(t *testing.T) { assert.Equal(t, c.jaeger.Name, dep.Spec.Template.Labels["app.kubernetes.io/instance"]) assert.Equal(t, fmt.Sprintf("%s-collector", c.jaeger.Name), dep.Spec.Template.Labels["app.kubernetes.io/name"]) } + +func TestCollectorWithDirectStorageType(t *testing.T) { + jaeger := &v1alpha1.Jaeger{ + ObjectMeta: metav1.ObjectMeta{ + Name: "TestCollectorWithDirectStorageType", + }, + Spec: v1alpha1.JaegerSpec{ + Storage: v1alpha1.JaegerStorageSpec{ + Type: "elasticsearch", + Options: v1alpha1.NewOptions(map[string]interface{}{ + "es.server-urls": "http://somewhere", + }), + }, + }, + } + collector := NewCollector(jaeger) + dep := collector.Get() + + envvars := []v1.EnvVar{ + v1.EnvVar{ + Name: "SPAN_STORAGE_TYPE", + Value: "elasticsearch", + }, + v1.EnvVar{ + Name: "COLLECTOR_ZIPKIN_HTTP_PORT", + Value: "9411", + }, + } + assert.Equal(t, envvars, dep.Spec.Template.Spec.Containers[0].Env) + assert.Len(t, dep.Spec.Template.Spec.Containers[0].Args, 2) + assert.Equal(t, "--es.server-urls=http://somewhere", dep.Spec.Template.Spec.Containers[0].Args[0]) +} + +func TestCollectorWithIngesterStorageType(t *testing.T) { + jaeger := &v1alpha1.Jaeger{ + ObjectMeta: metav1.ObjectMeta{ + Name: "TestCollectorWithIngesterStorageType", + }, + Spec: v1alpha1.JaegerSpec{ + Ingester: v1alpha1.JaegerIngesterSpec{ + Options: v1alpha1.NewOptions(map[string]interface{}{ + "kafka.topic": "mytopic", + }), + }, + Storage: v1alpha1.JaegerStorageSpec{ + Type: "elasticsearch", + Options: v1alpha1.NewOptions(map[string]interface{}{ + "kafka.brokers": "http://brokers", + "es.server-urls": "http://somewhere", + }), + }, + }, + } + collector := NewCollector(jaeger) + dep := collector.Get() + + envvars := []v1.EnvVar{ + v1.EnvVar{ + Name: "SPAN_STORAGE_TYPE", + Value: "kafka", + }, + v1.EnvVar{ + Name: "COLLECTOR_ZIPKIN_HTTP_PORT", + Value: "9411", + }, + } + assert.Equal(t, envvars, dep.Spec.Template.Spec.Containers[0].Env) + assert.Len(t, dep.Spec.Template.Spec.Containers[0].Args, 3) + assert.Equal(t, "--kafka.brokers=http://brokers", dep.Spec.Template.Spec.Containers[0].Args[0]) + assert.Equal(t, "--kafka.topic=mytopic", dep.Spec.Template.Spec.Containers[0].Args[1]) +} diff --git a/pkg/deployment/ingester.go b/pkg/deployment/ingester.go new file mode 100644 index 000000000..e3b992bb9 --- /dev/null +++ b/pkg/deployment/ingester.go @@ -0,0 +1,159 @@ +package deployment + +import ( + "fmt" + + "github.com/sirupsen/logrus" + "github.com/spf13/viper" + appsv1 "k8s.io/api/apps/v1" + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + + "github.com/jaegertracing/jaeger-operator/pkg/apis/io/v1alpha1" + "github.com/jaegertracing/jaeger-operator/pkg/config/sampling" + "github.com/jaegertracing/jaeger-operator/pkg/service" + "github.com/jaegertracing/jaeger-operator/pkg/storage" + "github.com/jaegertracing/jaeger-operator/pkg/util" +) + +// Ingester builds pods for jaegertracing/jaeger-ingester +type Ingester struct { + jaeger *v1alpha1.Jaeger +} + +// NewIngester builds a new Ingester struct based on the given spec +func NewIngester(jaeger *v1alpha1.Jaeger) *Ingester { + if jaeger.Spec.Ingester.Size <= 0 { + jaeger.Spec.Ingester.Size = 1 + } + + if jaeger.Spec.Ingester.Image == "" { + jaeger.Spec.Ingester.Image = fmt.Sprintf("%s:%s", viper.GetString("jaeger-ingester-image"), viper.GetString("jaeger-version")) + } + + return &Ingester{jaeger: jaeger} +} + +// Get returns a ingester pod +func (i *Ingester) Get() *appsv1.Deployment { + // Only create a deployment if ingester options have been defined + if len(i.jaeger.Spec.Ingester.Options.Map()) == 0 { + return nil + } + + logrus.Debugf("Assembling a ingester deployment for %v", i.jaeger) + + selector := i.selector() + trueVar := true + replicas := int32(i.jaeger.Spec.Ingester.Size) + + baseCommonSpec := v1alpha1.JaegerCommonSpec{ + Annotations: map[string]string{ + "prometheus.io/scrape": "true", + // TODO: Need to check if metrics port is exposed, and on what port number + "prometheus.io/port": "14268", + "sidecar.istio.io/inject": "false", + }, + } + + commonSpec := util.Merge([]v1alpha1.JaegerCommonSpec{i.jaeger.Spec.Ingester.JaegerCommonSpec, i.jaeger.Spec.JaegerCommonSpec, baseCommonSpec}) + + var envFromSource []v1.EnvFromSource + if len(i.jaeger.Spec.Storage.SecretName) > 0 { + envFromSource = append(envFromSource, v1.EnvFromSource{ + SecretRef: &v1.SecretEnvSource{ + LocalObjectReference: v1.LocalObjectReference{ + Name: i.jaeger.Spec.Storage.SecretName, + }, + }, + }) + } + + options := allArgs(i.jaeger.Spec.Ingester.Options, + i.jaeger.Spec.Storage.Options.Filter(storage.OptionsPrefix(i.jaeger.Spec.Storage.Type)), + i.jaeger.Spec.Storage.Options.Filter("kafka")) + + sampling.Update(i.jaeger, commonSpec, &options) + + return &appsv1.Deployment{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "apps/v1", + Kind: "Deployment", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-ingester", i.jaeger.Name), + Namespace: i.jaeger.Namespace, + OwnerReferences: []metav1.OwnerReference{ + metav1.OwnerReference{ + APIVersion: i.jaeger.APIVersion, + Kind: i.jaeger.Kind, + Name: i.jaeger.Name, + UID: i.jaeger.UID, + Controller: &trueVar, + }, + }, + }, + Spec: appsv1.DeploymentSpec{ + Replicas: &replicas, + Selector: &metav1.LabelSelector{ + MatchLabels: selector, + }, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: selector, + Annotations: commonSpec.Annotations, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{{ + Image: i.jaeger.Spec.Ingester.Image, + Name: "jaeger-ingester", + Args: options, + Env: []v1.EnvVar{ + v1.EnvVar{ + Name: "SPAN_STORAGE_TYPE", + Value: i.jaeger.Spec.Storage.Type, + }, + }, + VolumeMounts: commonSpec.VolumeMounts, + EnvFrom: envFromSource, + Ports: []v1.ContainerPort{ + { + ContainerPort: 14267, + Name: "c-tchan-trft", + }, + }, + ReadinessProbe: &v1.Probe{ + Handler: v1.Handler{ + HTTPGet: &v1.HTTPGetAction{ + Path: "/", + Port: intstr.FromInt(14267), + }, + }, + InitialDelaySeconds: 1, + }, + Resources: commonSpec.Resources, + }}, + Volumes: commonSpec.Volumes, + }, + }, + }, + } +} + +// Services returns a list of services to be deployed along with the ingesterdeployment +func (i *Ingester) Services() []*v1.Service { + services := []*v1.Service{} + + service := service.NewIngesterService(i.jaeger, i.selector()) + + if service != nil { + services = append(services, service) + } + + return services +} + +func (i *Ingester) selector() map[string]string { + return map[string]string{"app": "jaeger", "jaeger": i.jaeger.Name, "jaeger-component": "ingester"} +} diff --git a/pkg/deployment/ingester_test.go b/pkg/deployment/ingester_test.go new file mode 100644 index 000000000..b0c4f5269 --- /dev/null +++ b/pkg/deployment/ingester_test.go @@ -0,0 +1,326 @@ +package deployment + +import ( + "testing" + + "github.com/spf13/viper" + "github.com/stretchr/testify/assert" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/jaegertracing/jaeger-operator/pkg/apis/io/v1alpha1" +) + +func init() { + viper.SetDefault("jaeger-version", "1.6") + viper.SetDefault("jaeger-ingester-image", "jaegertracing/jaeger-ingester") +} + +func TestIngesterNotDefined(t *testing.T) { + jaeger := v1alpha1.NewJaeger("TestIngesterNotDefined") + + ingester := NewIngester(jaeger) + assert.Nil(t, ingester.Get()) +} + +func TestIngesterNegativeSize(t *testing.T) { + jaeger := newIngesterJaeger("TestIngesterNegativeSize") + jaeger.Spec.Ingester.Size = -1 + + ingester := NewIngester(jaeger) + dep := ingester.Get() + assert.Equal(t, int32(1), *dep.Spec.Replicas) +} + +func TestIngesterDefaultSize(t *testing.T) { + jaeger := newIngesterJaeger("TestIngesterDefaultSize") + jaeger.Spec.Ingester.Size = 0 + + ingester := NewIngester(jaeger) + dep := ingester.Get() + assert.Equal(t, int32(1), *dep.Spec.Replicas) +} + +func TestIngesterName(t *testing.T) { + jaeger := newIngesterJaeger("TestIngesterName") + ingester := NewIngester(jaeger) + dep := ingester.Get() + assert.Equal(t, "TestIngesterName-ingester", dep.ObjectMeta.Name) +} + +func TestIngesterServices(t *testing.T) { + jaeger := newIngesterJaeger("TestIngesterServices") + ingester := NewIngester(jaeger) + svcs := ingester.Services() + assert.Len(t, svcs, 1) +} + +func TestDefaultIngesterImage(t *testing.T) { + viper.Set("jaeger-ingester-image", "org/custom-ingester-image") + viper.Set("jaeger-version", "123") + defer viper.Reset() + + ingester := NewIngester(newIngesterJaeger("TestIngesterImage")) + dep := ingester.Get() + + containers := dep.Spec.Template.Spec.Containers + assert.Len(t, containers, 1) + assert.Equal(t, "org/custom-ingester-image:123", containers[0].Image) + + envvars := []v1.EnvVar{ + v1.EnvVar{ + Name: "SPAN_STORAGE_TYPE", + Value: "", + }, + } + assert.Equal(t, envvars, containers[0].Env) +} + +func TestIngesterAnnotations(t *testing.T) { + jaeger := newIngesterJaeger("TestIngesterAnnotations") + jaeger.Spec.Annotations = map[string]string{ + "name": "operator", + "hello": "jaeger", + } + jaeger.Spec.Ingester.Annotations = map[string]string{ + "hello": "world", // Override top level annotation + "prometheus.io/scrape": "false", // Override implicit value + } + + ingester := NewIngester(jaeger) + dep := ingester.Get() + + assert.Equal(t, "operator", dep.Spec.Template.Annotations["name"]) + assert.Equal(t, "false", dep.Spec.Template.Annotations["sidecar.istio.io/inject"]) + assert.Equal(t, "world", dep.Spec.Template.Annotations["hello"]) + assert.Equal(t, "false", dep.Spec.Template.Annotations["prometheus.io/scrape"]) +} + +func TestIngesterSecrets(t *testing.T) { + jaeger := newIngesterJaeger("TestIngesterSecrets") + secret := "mysecret" + jaeger.Spec.Storage.SecretName = secret + + ingester := NewIngester(jaeger) + dep := ingester.Get() + + assert.Equal(t, "mysecret", dep.Spec.Template.Spec.Containers[0].EnvFrom[0].SecretRef.LocalObjectReference.Name) +} + +func TestIngesterVolumeMountsWithVolumes(t *testing.T) { + name := "TestIngesterVolumeMountsWithVolumes" + + globalVolumes := []v1.Volume{ + v1.Volume{ + Name: "globalVolume", + VolumeSource: v1.VolumeSource{}, + }, + } + + globalVolumeMounts := []v1.VolumeMount{ + v1.VolumeMount{ + Name: "globalVolume", + }, + } + + ingesterVolumes := []v1.Volume{ + v1.Volume{ + Name: "ingesterVolume", + VolumeSource: v1.VolumeSource{}, + }, + } + + ingesterVolumeMounts := []v1.VolumeMount{ + v1.VolumeMount{ + Name: "ingesterVolume", + }, + } + + jaeger := newIngesterJaeger(name) + jaeger.Spec.Volumes = globalVolumes + jaeger.Spec.VolumeMounts = globalVolumeMounts + jaeger.Spec.Ingester.Volumes = ingesterVolumes + jaeger.Spec.Ingester.VolumeMounts = ingesterVolumeMounts + podSpec := NewIngester(jaeger).Get().Spec.Template.Spec + + // Additional 1 is sampling configmap + assert.Len(t, podSpec.Volumes, len(append(ingesterVolumes, globalVolumes...))+1) + assert.Len(t, podSpec.Containers[0].VolumeMounts, len(append(ingesterVolumeMounts, globalVolumeMounts...))+1) + + // ingester is first while global is second + assert.Equal(t, "ingesterVolume", podSpec.Volumes[0].Name) + assert.Equal(t, "globalVolume", podSpec.Volumes[1].Name) + assert.Equal(t, "ingesterVolume", podSpec.Containers[0].VolumeMounts[0].Name) + assert.Equal(t, "globalVolume", podSpec.Containers[0].VolumeMounts[1].Name) +} + +func TestIngesterMountGlobalVolumes(t *testing.T) { + name := "TestIngesterMountGlobalVolumes" + + globalVolumes := []v1.Volume{ + v1.Volume{ + Name: "globalVolume", + VolumeSource: v1.VolumeSource{}, + }, + } + + ingesterVolumeMounts := []v1.VolumeMount{ + v1.VolumeMount{ + Name: "globalVolume", + ReadOnly: true, + }, + } + + jaeger := newIngesterJaeger(name) + jaeger.Spec.Volumes = globalVolumes + jaeger.Spec.Ingester.VolumeMounts = ingesterVolumeMounts + podSpec := NewIngester(jaeger).Get().Spec.Template.Spec + + // Count includes the sampling configmap + assert.Len(t, podSpec.Containers[0].VolumeMounts, 2) + // ingester volume is mounted + assert.Equal(t, podSpec.Containers[0].VolumeMounts[0].Name, "globalVolume") +} + +func TestIngesterVolumeMountsWithSameName(t *testing.T) { + name := "TestIngesterVolumeMountsWithSameName" + + globalVolumeMounts := []v1.VolumeMount{ + v1.VolumeMount{ + Name: "data", + ReadOnly: true, + }, + } + + ingesterVolumeMounts := []v1.VolumeMount{ + v1.VolumeMount{ + Name: "data", + ReadOnly: false, + }, + } + + jaeger := newIngesterJaeger(name) + jaeger.Spec.VolumeMounts = globalVolumeMounts + jaeger.Spec.Ingester.VolumeMounts = ingesterVolumeMounts + podSpec := NewIngester(jaeger).Get().Spec.Template.Spec + + // Count includes the sampling configmap + assert.Len(t, podSpec.Containers[0].VolumeMounts, 2) + // ingester volume is mounted + assert.Equal(t, podSpec.Containers[0].VolumeMounts[0].ReadOnly, false) +} + +func TestIngesterVolumeWithSameName(t *testing.T) { + name := "TestIngesterVolumeWithSameName" + + globalVolumes := []v1.Volume{ + v1.Volume{ + Name: "data", + VolumeSource: v1.VolumeSource{HostPath: &v1.HostPathVolumeSource{Path: "/data1"}}, + }, + } + + ingesterVolumes := []v1.Volume{ + v1.Volume{ + Name: "data", + VolumeSource: v1.VolumeSource{HostPath: &v1.HostPathVolumeSource{Path: "/data2"}}, + }, + } + + jaeger := newIngesterJaeger(name) + jaeger.Spec.Volumes = globalVolumes + jaeger.Spec.Ingester.Volumes = ingesterVolumes + podSpec := NewIngester(jaeger).Get().Spec.Template.Spec + + // Count includes the sampling configmap + assert.Len(t, podSpec.Volumes, 2) + // ingester volume is mounted + assert.Equal(t, podSpec.Volumes[0].VolumeSource.HostPath.Path, "/data2") +} + +func TestIngesterResources(t *testing.T) { + jaeger := newIngesterJaeger("TestIngesterResources") + jaeger.Spec.Resources = v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceLimitsCPU: *resource.NewQuantity(1024, resource.BinarySI), + v1.ResourceLimitsEphemeralStorage: *resource.NewQuantity(512, resource.DecimalSI), + }, + Requests: v1.ResourceList{ + v1.ResourceRequestsCPU: *resource.NewQuantity(1024, resource.BinarySI), + v1.ResourceRequestsEphemeralStorage: *resource.NewQuantity(512, resource.DecimalSI), + }, + } + jaeger.Spec.Ingester.Resources = v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceLimitsCPU: *resource.NewQuantity(2048, resource.BinarySI), + v1.ResourceLimitsMemory: *resource.NewQuantity(123, resource.DecimalSI), + }, + Requests: v1.ResourceList{ + v1.ResourceRequestsCPU: *resource.NewQuantity(2048, resource.BinarySI), + v1.ResourceRequestsMemory: *resource.NewQuantity(123, resource.DecimalSI), + }, + } + + ingester := NewIngester(jaeger) + dep := ingester.Get() + + assert.Equal(t, *resource.NewQuantity(2048, resource.BinarySI), dep.Spec.Template.Spec.Containers[0].Resources.Limits[v1.ResourceLimitsCPU]) + assert.Equal(t, *resource.NewQuantity(2048, resource.BinarySI), dep.Spec.Template.Spec.Containers[0].Resources.Requests[v1.ResourceRequestsCPU]) + assert.Equal(t, *resource.NewQuantity(123, resource.DecimalSI), dep.Spec.Template.Spec.Containers[0].Resources.Limits[v1.ResourceLimitsMemory]) + assert.Equal(t, *resource.NewQuantity(123, resource.DecimalSI), dep.Spec.Template.Spec.Containers[0].Resources.Requests[v1.ResourceRequestsMemory]) + assert.Equal(t, *resource.NewQuantity(512, resource.DecimalSI), dep.Spec.Template.Spec.Containers[0].Resources.Limits[v1.ResourceLimitsEphemeralStorage]) + assert.Equal(t, *resource.NewQuantity(512, resource.DecimalSI), dep.Spec.Template.Spec.Containers[0].Resources.Requests[v1.ResourceRequestsEphemeralStorage]) +} + +func TestIngesterWithStorageType(t *testing.T) { + jaeger := &v1alpha1.Jaeger{ + ObjectMeta: metav1.ObjectMeta{ + Name: "TestIngesterStorageType", + }, + Spec: v1alpha1.JaegerSpec{ + Ingester: v1alpha1.JaegerIngesterSpec{ + Options: v1alpha1.NewOptions(map[string]interface{}{ + "kafka.topic": "mytopic", + }), + }, + Storage: v1alpha1.JaegerStorageSpec{ + Type: "elasticsearch", + Options: v1alpha1.NewOptions(map[string]interface{}{ + "kafka.brokers": "http://brokers", + "es.server-urls": "http://somewhere", + }), + }, + }, + } + ingester := NewIngester(jaeger) + dep := ingester.Get() + + envvars := []v1.EnvVar{ + v1.EnvVar{ + Name: "SPAN_STORAGE_TYPE", + Value: "elasticsearch", + }, + } + assert.Equal(t, envvars, dep.Spec.Template.Spec.Containers[0].Env) + assert.Len(t, dep.Spec.Template.Spec.Containers[0].Args, 4) + assert.Equal(t, "--kafka.topic=mytopic", dep.Spec.Template.Spec.Containers[0].Args[0]) + assert.Equal(t, "--es.server-urls=http://somewhere", dep.Spec.Template.Spec.Containers[0].Args[1]) + assert.Equal(t, "--kafka.brokers=http://brokers", dep.Spec.Template.Spec.Containers[0].Args[2]) + // Fourth arg is sampling strategy file +} + +func newIngesterJaeger(name string) *v1alpha1.Jaeger { + return &v1alpha1.Jaeger{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: v1alpha1.JaegerSpec{ + Ingester: v1alpha1.JaegerIngesterSpec{ + Options: v1alpha1.NewOptions(map[string]interface{}{ + "any": "option", + }), + }, + }, + } +} diff --git a/pkg/service/ingester.go b/pkg/service/ingester.go new file mode 100644 index 000000000..5cc36a9bd --- /dev/null +++ b/pkg/service/ingester.go @@ -0,0 +1,56 @@ +package service + +import ( + "fmt" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/jaegertracing/jaeger-operator/pkg/apis/io/v1alpha1" +) + +// NewIngesterService returns a new Kubernetes service for Jaeger Ingester backed by the pods matching the selector +func NewIngesterService(jaeger *v1alpha1.Jaeger, selector map[string]string) *v1.Service { + // Only create a service if ingester options have been defined + if len(jaeger.Spec.Ingester.Options.Map()) == 0 { + return nil + } + + trueVar := true + + return &v1.Service{ + TypeMeta: metav1.TypeMeta{ + Kind: "Service", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: GetNameForIngesterService(jaeger), + Namespace: jaeger.Namespace, + Labels: selector, + OwnerReferences: []metav1.OwnerReference{ + metav1.OwnerReference{ + APIVersion: jaeger.APIVersion, + Kind: jaeger.Kind, + Name: jaeger.Name, + UID: jaeger.UID, + Controller: &trueVar, + }, + }, + }, + Spec: v1.ServiceSpec{ + Selector: selector, + ClusterIP: "None", + Ports: []v1.ServicePort{ + { + Name: "c-tchan-trft", + Port: 14267, + }, + }, + }, + } +} + +// GetNameForIngesterService returns the service name for the ingester in this Jaeger instance +func GetNameForIngesterService(jaeger *v1alpha1.Jaeger) string { + return fmt.Sprintf("%s-ingester", jaeger.Name) +} diff --git a/pkg/service/ingester_test.go b/pkg/service/ingester_test.go new file mode 100644 index 000000000..c4b7afe4e --- /dev/null +++ b/pkg/service/ingester_test.go @@ -0,0 +1,44 @@ +package service + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/jaegertracing/jaeger-operator/pkg/apis/io/v1alpha1" +) + +func TestIngesterServiceNameAndPorts(t *testing.T) { + name := "TestIngesterServiceNameAndPorts" + selector := map[string]string{"app": "myapp", "jaeger": name, "jaeger-component": "ingester"} + + jaeger := &v1alpha1.Jaeger{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: v1alpha1.JaegerSpec{ + Ingester: v1alpha1.JaegerIngesterSpec{ + Options: v1alpha1.NewOptions(map[string]interface{}{ + "any": "option", + }), + }, + }, + } + svc := NewIngesterService(jaeger, selector) + assert.Equal(t, svc.ObjectMeta.Name, fmt.Sprintf("%s-ingester", name)) + + ports := map[int32]bool{ + 14267: false, + } + + for _, port := range svc.Spec.Ports { + ports[port.Port] = true + } + + for k, v := range ports { + assert.Equal(t, v, true, "Expected port %v to be specified, but wasn't", k) + } + +} diff --git a/pkg/strategy/production.go b/pkg/strategy/production.go index 6362845a8..64b4bbeb6 100644 --- a/pkg/strategy/production.go +++ b/pkg/strategy/production.go @@ -37,6 +37,7 @@ func (c *productionStrategy) Create() []runtime.Object { collector := deployment.NewCollector(c.jaeger) query := deployment.NewQuery(c.jaeger) agent := deployment.NewAgent(c.jaeger) + ingester := deployment.NewIngester(c.jaeger) os := []runtime.Object{} // add all service accounts @@ -62,6 +63,11 @@ func (c *productionStrategy) Create() []runtime.Object { inject.OAuthProxy(c.jaeger, query.Get()), ) + ingesterDeployment := ingester.Get() + if ingesterDeployment != nil { + os = append(os, ingesterDeployment) + } + if ds := agent.Get(); nil != ds { os = append(os, ds) } @@ -75,6 +81,10 @@ func (c *productionStrategy) Create() []runtime.Object { os = append(os, svc) } + for _, svc := range ingester.Services() { + os = append(os, svc) + } + // add the routes/ingresses if viper.GetString("platform") == v1alpha1.FlagPlatformOpenShift { if q := route.NewQueryRoute(c.jaeger).Get(); nil != q { From c8ecbdd0fbc572cfdad841304e4c57ebabfc27c6 Mon Sep 17 00:00:00 2001 From: Gary Brown Date: Thu, 3 Jan 2019 17:48:16 +0000 Subject: [PATCH 02/14] Remove unnecessary type declarations in arrays for deployment tests Signed-off-by: Gary Brown --- pkg/deployment/all-in-one_test.go | 24 +++++++++++------------ pkg/deployment/collector_test.go | 32 +++++++++++++++---------------- pkg/deployment/ingester_test.go | 24 +++++++++++------------ pkg/deployment/query_test.go | 20 +++++++++---------- 4 files changed, 50 insertions(+), 50 deletions(-) diff --git a/pkg/deployment/all-in-one_test.go b/pkg/deployment/all-in-one_test.go index d2b215e9d..8b7ca409c 100644 --- a/pkg/deployment/all-in-one_test.go +++ b/pkg/deployment/all-in-one_test.go @@ -27,11 +27,11 @@ func TestDefaultAllInOneImage(t *testing.T) { assert.Equal(t, "org/custom-all-in-one-image:123", d.Spec.Template.Spec.Containers[0].Image) envvars := []v1.EnvVar{ - v1.EnvVar{ + { Name: "SPAN_STORAGE_TYPE", Value: "", }, - v1.EnvVar{ + { Name: "COLLECTOR_ZIPKIN_HTTP_PORT", Value: "9411", }, @@ -80,27 +80,27 @@ func TestAllInOneVolumeMountsWithVolumes(t *testing.T) { name := "TestAllInOneVolumeMountsWithVolumes" globalVolumes := []v1.Volume{ - v1.Volume{ + { Name: "globalVolume", VolumeSource: v1.VolumeSource{}, }, } globalVolumeMounts := []v1.VolumeMount{ - v1.VolumeMount{ + { Name: "globalVolume", }, } allInOneVolumes := []v1.Volume{ - v1.Volume{ + { Name: "allInOneVolume", VolumeSource: v1.VolumeSource{}, }, } allInOneVolumeMounts := []v1.VolumeMount{ - v1.VolumeMount{ + { Name: "allInOneVolume", }, } @@ -138,14 +138,14 @@ func TestAllInOneMountGlobalVolumes(t *testing.T) { name := "TestAllInOneMountGlobalVolumes" globalVolumes := []v1.Volume{ - v1.Volume{ + { Name: "globalVolume", VolumeSource: v1.VolumeSource{}, }, } allInOneVolumeMounts := []v1.VolumeMount{ - v1.VolumeMount{ + { Name: "globalVolume", ReadOnly: true, }, @@ -166,14 +166,14 @@ func TestAllInOneVolumeMountsWithSameName(t *testing.T) { name := "TestAllInOneVolumeMountsWithSameName" globalVolumeMounts := []v1.VolumeMount{ - v1.VolumeMount{ + { Name: "data", ReadOnly: true, }, } allInOneVolumeMounts := []v1.VolumeMount{ - v1.VolumeMount{ + { Name: "data", ReadOnly: false, }, @@ -194,14 +194,14 @@ func TestAllInOneVolumeWithSameName(t *testing.T) { name := "TestAllInOneVolumeWithSameName" globalVolumes := []v1.Volume{ - v1.Volume{ + { Name: "data", VolumeSource: v1.VolumeSource{HostPath: &v1.HostPathVolumeSource{Path: "/data1"}}, }, } allInOneVolumes := []v1.Volume{ - v1.Volume{ + { Name: "data", VolumeSource: v1.VolumeSource{HostPath: &v1.HostPathVolumeSource{Path: "/data2"}}, }, diff --git a/pkg/deployment/collector_test.go b/pkg/deployment/collector_test.go index 734bb29b8..97db3a366 100644 --- a/pkg/deployment/collector_test.go +++ b/pkg/deployment/collector_test.go @@ -61,11 +61,11 @@ func TestDefaultCollectorImage(t *testing.T) { assert.Equal(t, "org/custom-collector-image:123", containers[0].Image) envvars := []v1.EnvVar{ - v1.EnvVar{ + { Name: "SPAN_STORAGE_TYPE", Value: "", }, - v1.EnvVar{ + { Name: "COLLECTOR_ZIPKIN_HTTP_PORT", Value: "9411", }, @@ -108,27 +108,27 @@ func TestCollectorVolumeMountsWithVolumes(t *testing.T) { name := "TestCollectorVolumeMountsWithVolumes" globalVolumes := []v1.Volume{ - v1.Volume{ + { Name: "globalVolume", VolumeSource: v1.VolumeSource{}, }, } globalVolumeMounts := []v1.VolumeMount{ - v1.VolumeMount{ + { Name: "globalVolume", }, } collectorVolumes := []v1.Volume{ - v1.Volume{ + { Name: "collectorVolume", VolumeSource: v1.VolumeSource{}, }, } collectorVolumeMounts := []v1.VolumeMount{ - v1.VolumeMount{ + { Name: "collectorVolume", }, } @@ -155,14 +155,14 @@ func TestCollectorMountGlobalVolumes(t *testing.T) { name := "TestCollectorMountGlobalVolumes" globalVolumes := []v1.Volume{ - v1.Volume{ + { Name: "globalVolume", VolumeSource: v1.VolumeSource{}, }, } collectorVolumeMounts := []v1.VolumeMount{ - v1.VolumeMount{ + { Name: "globalVolume", ReadOnly: true, }, @@ -183,14 +183,14 @@ func TestCollectorVolumeMountsWithSameName(t *testing.T) { name := "TestCollectorVolumeMountsWithSameName" globalVolumeMounts := []v1.VolumeMount{ - v1.VolumeMount{ + { Name: "data", ReadOnly: true, }, } collectorVolumeMounts := []v1.VolumeMount{ - v1.VolumeMount{ + { Name: "data", ReadOnly: false, }, @@ -211,14 +211,14 @@ func TestCollectorVolumeWithSameName(t *testing.T) { name := "TestCollectorVolumeWithSameName" globalVolumes := []v1.Volume{ - v1.Volume{ + { Name: "data", VolumeSource: v1.VolumeSource{HostPath: &v1.HostPathVolumeSource{Path: "/data1"}}, }, } collectorVolumes := []v1.Volume{ - v1.Volume{ + { Name: "data", VolumeSource: v1.VolumeSource{HostPath: &v1.HostPathVolumeSource{Path: "/data2"}}, }, @@ -296,11 +296,11 @@ func TestCollectorWithDirectStorageType(t *testing.T) { dep := collector.Get() envvars := []v1.EnvVar{ - v1.EnvVar{ + { Name: "SPAN_STORAGE_TYPE", Value: "elasticsearch", }, - v1.EnvVar{ + { Name: "COLLECTOR_ZIPKIN_HTTP_PORT", Value: "9411", }, @@ -334,11 +334,11 @@ func TestCollectorWithIngesterStorageType(t *testing.T) { dep := collector.Get() envvars := []v1.EnvVar{ - v1.EnvVar{ + { Name: "SPAN_STORAGE_TYPE", Value: "kafka", }, - v1.EnvVar{ + { Name: "COLLECTOR_ZIPKIN_HTTP_PORT", Value: "9411", }, diff --git a/pkg/deployment/ingester_test.go b/pkg/deployment/ingester_test.go index b0c4f5269..7bbe5df89 100644 --- a/pkg/deployment/ingester_test.go +++ b/pkg/deployment/ingester_test.go @@ -69,7 +69,7 @@ func TestDefaultIngesterImage(t *testing.T) { assert.Equal(t, "org/custom-ingester-image:123", containers[0].Image) envvars := []v1.EnvVar{ - v1.EnvVar{ + { Name: "SPAN_STORAGE_TYPE", Value: "", }, @@ -112,27 +112,27 @@ func TestIngesterVolumeMountsWithVolumes(t *testing.T) { name := "TestIngesterVolumeMountsWithVolumes" globalVolumes := []v1.Volume{ - v1.Volume{ + { Name: "globalVolume", VolumeSource: v1.VolumeSource{}, }, } globalVolumeMounts := []v1.VolumeMount{ - v1.VolumeMount{ + { Name: "globalVolume", }, } ingesterVolumes := []v1.Volume{ - v1.Volume{ + { Name: "ingesterVolume", VolumeSource: v1.VolumeSource{}, }, } ingesterVolumeMounts := []v1.VolumeMount{ - v1.VolumeMount{ + { Name: "ingesterVolume", }, } @@ -159,14 +159,14 @@ func TestIngesterMountGlobalVolumes(t *testing.T) { name := "TestIngesterMountGlobalVolumes" globalVolumes := []v1.Volume{ - v1.Volume{ + { Name: "globalVolume", VolumeSource: v1.VolumeSource{}, }, } ingesterVolumeMounts := []v1.VolumeMount{ - v1.VolumeMount{ + { Name: "globalVolume", ReadOnly: true, }, @@ -187,14 +187,14 @@ func TestIngesterVolumeMountsWithSameName(t *testing.T) { name := "TestIngesterVolumeMountsWithSameName" globalVolumeMounts := []v1.VolumeMount{ - v1.VolumeMount{ + { Name: "data", ReadOnly: true, }, } ingesterVolumeMounts := []v1.VolumeMount{ - v1.VolumeMount{ + { Name: "data", ReadOnly: false, }, @@ -215,14 +215,14 @@ func TestIngesterVolumeWithSameName(t *testing.T) { name := "TestIngesterVolumeWithSameName" globalVolumes := []v1.Volume{ - v1.Volume{ + { Name: "data", VolumeSource: v1.VolumeSource{HostPath: &v1.HostPathVolumeSource{Path: "/data1"}}, }, } ingesterVolumes := []v1.Volume{ - v1.Volume{ + { Name: "data", VolumeSource: v1.VolumeSource{HostPath: &v1.HostPathVolumeSource{Path: "/data2"}}, }, @@ -297,7 +297,7 @@ func TestIngesterWithStorageType(t *testing.T) { dep := ingester.Get() envvars := []v1.EnvVar{ - v1.EnvVar{ + { Name: "SPAN_STORAGE_TYPE", Value: "elasticsearch", }, diff --git a/pkg/deployment/query_test.go b/pkg/deployment/query_test.go index 195ebb393..a6dedaf8f 100644 --- a/pkg/deployment/query_test.go +++ b/pkg/deployment/query_test.go @@ -98,27 +98,27 @@ func TestQueryVolumeMountsWithVolumes(t *testing.T) { name := "TestQueryVolumeMountsWithVolumes" globalVolumes := []v1.Volume{ - v1.Volume{ + { Name: "globalVolume", VolumeSource: v1.VolumeSource{}, }, } globalVolumeMounts := []v1.VolumeMount{ - v1.VolumeMount{ + { Name: "globalVolume", }, } queryVolumes := []v1.Volume{ - v1.Volume{ + { Name: "queryVolume", VolumeSource: v1.VolumeSource{}, }, } queryVolumeMounts := []v1.VolumeMount{ - v1.VolumeMount{ + { Name: "queryVolume", }, } @@ -144,14 +144,14 @@ func TestQueryMountGlobalVolumes(t *testing.T) { name := "TestQueryMountGlobalVolumes" globalVolumes := []v1.Volume{ - v1.Volume{ + { Name: "globalVolume", VolumeSource: v1.VolumeSource{}, }, } queryVolumeMounts := []v1.VolumeMount{ - v1.VolumeMount{ + { Name: "globalVolume", ReadOnly: true, }, @@ -171,14 +171,14 @@ func TestQueryVolumeMountsWithSameName(t *testing.T) { name := "TestQueryVolumeMountsWithSameName" globalVolumeMounts := []v1.VolumeMount{ - v1.VolumeMount{ + { Name: "data", ReadOnly: true, }, } queryVolumeMounts := []v1.VolumeMount{ - v1.VolumeMount{ + { Name: "data", ReadOnly: false, }, @@ -198,14 +198,14 @@ func TestQueryVolumeWithSameName(t *testing.T) { name := "TestQueryVolumeWithSameName" globalVolumes := []v1.Volume{ - v1.Volume{ + { Name: "data", VolumeSource: v1.VolumeSource{HostPath: &v1.HostPathVolumeSource{Path: "/data1"}}, }, } queryVolumes := []v1.Volume{ - v1.Volume{ + { Name: "data", VolumeSource: v1.VolumeSource{HostPath: &v1.HostPathVolumeSource{Path: "/data2"}}, }, From b39644623418897da3ecdc26668e98c2516c5d2b Mon Sep 17 00:00:00 2001 From: Gary Brown Date: Thu, 17 Jan 2019 12:01:20 +0000 Subject: [PATCH 03/14] Update to use a separate new strategy call 'streaming' Signed-off-by: Gary Brown --- pkg/deployment/collector.go | 5 +- pkg/deployment/collector_test.go | 35 ++++++ pkg/deployment/ingester.go | 7 +- pkg/deployment/ingester_test.go | 21 ++-- pkg/service/ingester.go | 4 +- pkg/service/ingester_test.go | 34 ++++++ pkg/strategy/controller.go | 6 +- pkg/strategy/controller_test.go | 13 ++ pkg/strategy/production.go | 10 -- pkg/strategy/streaming.go | 125 +++++++++++++++++++ pkg/strategy/streaming_test.go | 199 +++++++++++++++++++++++++++++++ 11 files changed, 427 insertions(+), 32 deletions(-) create mode 100644 pkg/strategy/streaming.go create mode 100644 pkg/strategy/streaming_test.go diff --git a/pkg/deployment/collector.go b/pkg/deployment/collector.go index 8d4e8da44..1ba7b0f02 100644 --- a/pkg/deployment/collector.go +++ b/pkg/deployment/collector.go @@ -2,6 +2,7 @@ package deployment import ( "fmt" + "strings" "github.com/sirupsen/logrus" "github.com/spf13/viper" @@ -65,9 +66,9 @@ func (c *Collector) Get() *appsv1.Deployment { } storageType := c.jaeger.Spec.Storage.Type - // If ingester options have been defined, then change storage type + // If sttategy is "streaming", then change storage type // to Kafka, and the storage options will be used in the Ingester instead - if len(c.jaeger.Spec.Ingester.Options.Map()) > 0 { + if strings.EqualFold(c.jaeger.Spec.Strategy, "streaming") { storageType = "kafka" } options := allArgs(c.jaeger.Spec.Collector.Options, diff --git a/pkg/deployment/collector_test.go b/pkg/deployment/collector_test.go index 97db3a366..8cff8d41e 100644 --- a/pkg/deployment/collector_test.go +++ b/pkg/deployment/collector_test.go @@ -316,6 +316,7 @@ func TestCollectorWithIngesterStorageType(t *testing.T) { Name: "TestCollectorWithIngesterStorageType", }, Spec: v1alpha1.JaegerSpec{ + Strategy: "streaming", Ingester: v1alpha1.JaegerIngesterSpec{ Options: v1alpha1.NewOptions(map[string]interface{}{ "kafka.topic": "mytopic", @@ -348,3 +349,37 @@ func TestCollectorWithIngesterStorageType(t *testing.T) { assert.Equal(t, "--kafka.brokers=http://brokers", dep.Spec.Template.Spec.Containers[0].Args[0]) assert.Equal(t, "--kafka.topic=mytopic", dep.Spec.Template.Spec.Containers[0].Args[1]) } + +func TestCollectorWithIngesterNoOptionsStorageType(t *testing.T) { + jaeger := &v1alpha1.Jaeger{ + ObjectMeta: metav1.ObjectMeta{ + Name: "TestCollectorWithIngesterNoOptionsStorageType", + }, + Spec: v1alpha1.JaegerSpec{ + Strategy: "streaming", + Storage: v1alpha1.JaegerStorageSpec{ + Type: "elasticsearch", + Options: v1alpha1.NewOptions(map[string]interface{}{ + "kafka.brokers": "http://brokers", + "es.server-urls": "http://somewhere", + }), + }, + }, + } + collector := NewCollector(jaeger) + dep := collector.Get() + + envvars := []v1.EnvVar{ + { + Name: "SPAN_STORAGE_TYPE", + Value: "kafka", + }, + { + Name: "COLLECTOR_ZIPKIN_HTTP_PORT", + Value: "9411", + }, + } + assert.Equal(t, envvars, dep.Spec.Template.Spec.Containers[0].Env) + assert.Len(t, dep.Spec.Template.Spec.Containers[0].Args, 2) + assert.Equal(t, "--kafka.brokers=http://brokers", dep.Spec.Template.Spec.Containers[0].Args[0]) +} diff --git a/pkg/deployment/ingester.go b/pkg/deployment/ingester.go index e3b992bb9..787ee6c09 100644 --- a/pkg/deployment/ingester.go +++ b/pkg/deployment/ingester.go @@ -2,6 +2,7 @@ package deployment import ( "fmt" + "strings" "github.com/sirupsen/logrus" "github.com/spf13/viper" @@ -11,7 +12,6 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" "github.com/jaegertracing/jaeger-operator/pkg/apis/io/v1alpha1" - "github.com/jaegertracing/jaeger-operator/pkg/config/sampling" "github.com/jaegertracing/jaeger-operator/pkg/service" "github.com/jaegertracing/jaeger-operator/pkg/storage" "github.com/jaegertracing/jaeger-operator/pkg/util" @@ -37,8 +37,7 @@ func NewIngester(jaeger *v1alpha1.Jaeger) *Ingester { // Get returns a ingester pod func (i *Ingester) Get() *appsv1.Deployment { - // Only create a deployment if ingester options have been defined - if len(i.jaeger.Spec.Ingester.Options.Map()) == 0 { + if !strings.EqualFold(i.jaeger.Spec.Strategy, "streaming") { return nil } @@ -74,8 +73,6 @@ func (i *Ingester) Get() *appsv1.Deployment { i.jaeger.Spec.Storage.Options.Filter(storage.OptionsPrefix(i.jaeger.Spec.Storage.Type)), i.jaeger.Spec.Storage.Options.Filter("kafka")) - sampling.Update(i.jaeger, commonSpec, &options) - return &appsv1.Deployment{ TypeMeta: metav1.TypeMeta{ APIVersion: "apps/v1", diff --git a/pkg/deployment/ingester_test.go b/pkg/deployment/ingester_test.go index 7bbe5df89..fdcf764ad 100644 --- a/pkg/deployment/ingester_test.go +++ b/pkg/deployment/ingester_test.go @@ -61,7 +61,7 @@ func TestDefaultIngesterImage(t *testing.T) { viper.Set("jaeger-version", "123") defer viper.Reset() - ingester := NewIngester(newIngesterJaeger("TestIngesterImage")) + ingester := NewIngester(newIngesterJaeger("TestDefaultIngesterImage")) dep := ingester.Get() containers := dep.Spec.Template.Spec.Containers @@ -144,9 +144,8 @@ func TestIngesterVolumeMountsWithVolumes(t *testing.T) { jaeger.Spec.Ingester.VolumeMounts = ingesterVolumeMounts podSpec := NewIngester(jaeger).Get().Spec.Template.Spec - // Additional 1 is sampling configmap - assert.Len(t, podSpec.Volumes, len(append(ingesterVolumes, globalVolumes...))+1) - assert.Len(t, podSpec.Containers[0].VolumeMounts, len(append(ingesterVolumeMounts, globalVolumeMounts...))+1) + assert.Len(t, podSpec.Volumes, len(append(ingesterVolumes, globalVolumes...))) + assert.Len(t, podSpec.Containers[0].VolumeMounts, len(append(ingesterVolumeMounts, globalVolumeMounts...))) // ingester is first while global is second assert.Equal(t, "ingesterVolume", podSpec.Volumes[0].Name) @@ -177,8 +176,7 @@ func TestIngesterMountGlobalVolumes(t *testing.T) { jaeger.Spec.Ingester.VolumeMounts = ingesterVolumeMounts podSpec := NewIngester(jaeger).Get().Spec.Template.Spec - // Count includes the sampling configmap - assert.Len(t, podSpec.Containers[0].VolumeMounts, 2) + assert.Len(t, podSpec.Containers[0].VolumeMounts, 1) // ingester volume is mounted assert.Equal(t, podSpec.Containers[0].VolumeMounts[0].Name, "globalVolume") } @@ -205,8 +203,7 @@ func TestIngesterVolumeMountsWithSameName(t *testing.T) { jaeger.Spec.Ingester.VolumeMounts = ingesterVolumeMounts podSpec := NewIngester(jaeger).Get().Spec.Template.Spec - // Count includes the sampling configmap - assert.Len(t, podSpec.Containers[0].VolumeMounts, 2) + assert.Len(t, podSpec.Containers[0].VolumeMounts, 1) // ingester volume is mounted assert.Equal(t, podSpec.Containers[0].VolumeMounts[0].ReadOnly, false) } @@ -233,8 +230,7 @@ func TestIngesterVolumeWithSameName(t *testing.T) { jaeger.Spec.Ingester.Volumes = ingesterVolumes podSpec := NewIngester(jaeger).Get().Spec.Template.Spec - // Count includes the sampling configmap - assert.Len(t, podSpec.Volumes, 2) + assert.Len(t, podSpec.Volumes, 1) // ingester volume is mounted assert.Equal(t, podSpec.Volumes[0].VolumeSource.HostPath.Path, "/data2") } @@ -279,6 +275,7 @@ func TestIngesterWithStorageType(t *testing.T) { Name: "TestIngesterStorageType", }, Spec: v1alpha1.JaegerSpec{ + Strategy: "streaming", Ingester: v1alpha1.JaegerIngesterSpec{ Options: v1alpha1.NewOptions(map[string]interface{}{ "kafka.topic": "mytopic", @@ -303,11 +300,10 @@ func TestIngesterWithStorageType(t *testing.T) { }, } assert.Equal(t, envvars, dep.Spec.Template.Spec.Containers[0].Env) - assert.Len(t, dep.Spec.Template.Spec.Containers[0].Args, 4) + assert.Len(t, dep.Spec.Template.Spec.Containers[0].Args, 3) assert.Equal(t, "--kafka.topic=mytopic", dep.Spec.Template.Spec.Containers[0].Args[0]) assert.Equal(t, "--es.server-urls=http://somewhere", dep.Spec.Template.Spec.Containers[0].Args[1]) assert.Equal(t, "--kafka.brokers=http://brokers", dep.Spec.Template.Spec.Containers[0].Args[2]) - // Fourth arg is sampling strategy file } func newIngesterJaeger(name string) *v1alpha1.Jaeger { @@ -316,6 +312,7 @@ func newIngesterJaeger(name string) *v1alpha1.Jaeger { Name: name, }, Spec: v1alpha1.JaegerSpec{ + Strategy: "streaming", Ingester: v1alpha1.JaegerIngesterSpec{ Options: v1alpha1.NewOptions(map[string]interface{}{ "any": "option", diff --git a/pkg/service/ingester.go b/pkg/service/ingester.go index 5cc36a9bd..ecbd46801 100644 --- a/pkg/service/ingester.go +++ b/pkg/service/ingester.go @@ -2,6 +2,7 @@ package service import ( "fmt" + "strings" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -11,8 +12,7 @@ import ( // NewIngesterService returns a new Kubernetes service for Jaeger Ingester backed by the pods matching the selector func NewIngesterService(jaeger *v1alpha1.Jaeger, selector map[string]string) *v1.Service { - // Only create a service if ingester options have been defined - if len(jaeger.Spec.Ingester.Options.Map()) == 0 { + if !strings.EqualFold(jaeger.Spec.Strategy, "streaming") { return nil } diff --git a/pkg/service/ingester_test.go b/pkg/service/ingester_test.go index c4b7afe4e..e9b5fe020 100644 --- a/pkg/service/ingester_test.go +++ b/pkg/service/ingester_test.go @@ -19,6 +19,7 @@ func TestIngesterServiceNameAndPorts(t *testing.T) { Name: name, }, Spec: v1alpha1.JaegerSpec{ + Strategy: "streaming", Ingester: v1alpha1.JaegerIngesterSpec{ Options: v1alpha1.NewOptions(map[string]interface{}{ "any": "option", @@ -42,3 +43,36 @@ func TestIngesterServiceNameAndPorts(t *testing.T) { } } + +func TestIngesterNoServiceWrongStrategy(t *testing.T) { + name := "TestIngesterNoServiceWrongStrategy" + selector := map[string]string{"app": "myapp", "jaeger": name, "jaeger-component": "ingester"} + + jaeger := &v1alpha1.Jaeger{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: v1alpha1.JaegerSpec{ + Strategy: "production", + Ingester: v1alpha1.JaegerIngesterSpec{ + Options: v1alpha1.NewOptions(map[string]interface{}{ + "any": "option", + }), + }, + }, + } + assert.Nil(t, NewIngesterService(jaeger, selector)) +} + +func TestIngesterNoServiceMissingStrategy(t *testing.T) { + name := "TestIngesterNoServiceMissingStrategy" + selector := map[string]string{"app": "myapp", "jaeger": name, "jaeger-component": "ingester"} + + jaeger := &v1alpha1.Jaeger{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: v1alpha1.JaegerSpec{}, + } + assert.Nil(t, NewIngesterService(jaeger, selector)) +} diff --git a/pkg/strategy/controller.go b/pkg/strategy/controller.go index 014de8a9f..e47a700f4 100644 --- a/pkg/strategy/controller.go +++ b/pkg/strategy/controller.go @@ -36,6 +36,10 @@ func For(ctx context.Context, jaeger *v1alpha1.Jaeger) S { return newAllInOneStrategy(ctx, jaeger) } + if strings.ToLower(jaeger.Spec.Strategy) == "streaming" { + return newStreamingStrategy(ctx, jaeger) + } + return newProductionStrategy(ctx, jaeger) } @@ -65,7 +69,7 @@ func normalize(jaeger *v1alpha1.Jaeger) { } // normalize the deployment strategy - if strings.ToLower(jaeger.Spec.Strategy) != "production" { + if strings.ToLower(jaeger.Spec.Strategy) != "production" && strings.ToLower(jaeger.Spec.Strategy) != "streaming" { jaeger.Spec.Strategy = "allInOne" } diff --git a/pkg/strategy/controller_test.go b/pkg/strategy/controller_test.go index eb65a2298..3305aa67f 100644 --- a/pkg/strategy/controller_test.go +++ b/pkg/strategy/controller_test.go @@ -100,6 +100,19 @@ func TestIncompatibleStorageForProduction(t *testing.T) { assert.Equal(t, "allInOne", jaeger.Spec.Strategy) } +func TestIncompatibleStorageForStreaming(t *testing.T) { + jaeger := &v1alpha1.Jaeger{ + Spec: v1alpha1.JaegerSpec{ + Strategy: "streaming", + Storage: v1alpha1.JaegerStorageSpec{ + Type: "memory", + }, + }, + } + normalize(jaeger) + assert.Equal(t, "allInOne", jaeger.Spec.Strategy) +} + func TestDeprecatedAllInOneStrategy(t *testing.T) { jaeger := &v1alpha1.Jaeger{ Spec: v1alpha1.JaegerSpec{ diff --git a/pkg/strategy/production.go b/pkg/strategy/production.go index 64b4bbeb6..6362845a8 100644 --- a/pkg/strategy/production.go +++ b/pkg/strategy/production.go @@ -37,7 +37,6 @@ func (c *productionStrategy) Create() []runtime.Object { collector := deployment.NewCollector(c.jaeger) query := deployment.NewQuery(c.jaeger) agent := deployment.NewAgent(c.jaeger) - ingester := deployment.NewIngester(c.jaeger) os := []runtime.Object{} // add all service accounts @@ -63,11 +62,6 @@ func (c *productionStrategy) Create() []runtime.Object { inject.OAuthProxy(c.jaeger, query.Get()), ) - ingesterDeployment := ingester.Get() - if ingesterDeployment != nil { - os = append(os, ingesterDeployment) - } - if ds := agent.Get(); nil != ds { os = append(os, ds) } @@ -81,10 +75,6 @@ func (c *productionStrategy) Create() []runtime.Object { os = append(os, svc) } - for _, svc := range ingester.Services() { - os = append(os, svc) - } - // add the routes/ingresses if viper.GetString("platform") == v1alpha1.FlagPlatformOpenShift { if q := route.NewQueryRoute(c.jaeger).Get(); nil != q { diff --git a/pkg/strategy/streaming.go b/pkg/strategy/streaming.go new file mode 100644 index 000000000..ac3f7240f --- /dev/null +++ b/pkg/strategy/streaming.go @@ -0,0 +1,125 @@ +package strategy + +import ( + "context" + "strings" + + "github.com/sirupsen/logrus" + "github.com/spf13/viper" + batchv1 "k8s.io/api/batch/v1" + "k8s.io/apimachinery/pkg/runtime" + + "github.com/jaegertracing/jaeger-operator/pkg/account" + "github.com/jaegertracing/jaeger-operator/pkg/apis/io/v1alpha1" + "github.com/jaegertracing/jaeger-operator/pkg/config/sampling" + "github.com/jaegertracing/jaeger-operator/pkg/config/ui" + "github.com/jaegertracing/jaeger-operator/pkg/cronjob" + "github.com/jaegertracing/jaeger-operator/pkg/deployment" + "github.com/jaegertracing/jaeger-operator/pkg/ingress" + "github.com/jaegertracing/jaeger-operator/pkg/inject" + "github.com/jaegertracing/jaeger-operator/pkg/route" + "github.com/jaegertracing/jaeger-operator/pkg/storage" +) + +type streamingStrategy struct { + ctx context.Context + jaeger *v1alpha1.Jaeger +} + +func newStreamingStrategy(ctx context.Context, jaeger *v1alpha1.Jaeger) *streamingStrategy { + return &streamingStrategy{ + ctx: ctx, + jaeger: jaeger, + } +} + +func (c *streamingStrategy) Create() []runtime.Object { + collector := deployment.NewCollector(c.jaeger) + query := deployment.NewQuery(c.jaeger) + agent := deployment.NewAgent(c.jaeger) + ingester := deployment.NewIngester(c.jaeger) + os := []runtime.Object{} + + // add all service accounts + for _, acc := range account.Get(c.jaeger) { + os = append(os, acc) + } + + // add the config map + cm := configmap.NewUIConfig(c.jaeger).Get() + if nil != cm { + os = append(os, cm) + } + + // add the Sampling config map + scmp := sampling.NewConfig(c.jaeger).Get() + if nil != scmp { + os = append(os, scmp) + } + + // add the deployments + os = append(os, + collector.Get(), + inject.OAuthProxy(c.jaeger, query.Get()), + ) + + ingesterDeployment := ingester.Get() + if ingesterDeployment != nil { + os = append(os, ingesterDeployment) + } + + if ds := agent.Get(); nil != ds { + os = append(os, ds) + } + + // add the services + for _, svc := range collector.Services() { + os = append(os, svc) + } + + for _, svc := range query.Services() { + os = append(os, svc) + } + + for _, svc := range ingester.Services() { + os = append(os, svc) + } + + // add the routes/ingresses + if viper.GetString("platform") == v1alpha1.FlagPlatformOpenShift { + if q := route.NewQueryRoute(c.jaeger).Get(); nil != q { + os = append(os, q) + } + } else { + if q := ingress.NewQueryIngress(c.jaeger).Get(); nil != q { + os = append(os, q) + } + } + + if isBoolTrue(c.jaeger.Spec.Storage.SparkDependencies.Enabled) { + if cronjob.SupportedStorage(c.jaeger.Spec.Storage.Type) { + os = append(os, cronjob.CreateSparkDependencies(c.jaeger)) + } else { + logrus.WithField("type", c.jaeger.Spec.Storage.Type).Warn("Skipping spark dependencies job due to unsupported storage.") + } + } + + if isBoolTrue(c.jaeger.Spec.Storage.EsIndexCleaner.Enabled) { + if strings.ToLower(c.jaeger.Spec.Storage.Type) == "elasticsearch" { + os = append(os, cronjob.CreateEsIndexCleaner(c.jaeger)) + } else { + logrus.WithField("type", c.jaeger.Spec.Storage.Type).Warn("Skipping Elasticsearch index cleaner job due to unsupported storage.") + } + } + + return os +} + +func (c *streamingStrategy) Update() []runtime.Object { + logrus.Debug("Update isn't yet available") + return []runtime.Object{} +} + +func (c *streamingStrategy) Dependencies() []batchv1.Job { + return storage.Dependencies(c.jaeger) +} diff --git a/pkg/strategy/streaming_test.go b/pkg/strategy/streaming_test.go new file mode 100644 index 000000000..1554ee7e3 --- /dev/null +++ b/pkg/strategy/streaming_test.go @@ -0,0 +1,199 @@ +package strategy + +import ( + "context" + "fmt" + "strings" + "testing" + + "github.com/spf13/viper" + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + + "github.com/jaegertracing/jaeger-operator/pkg/apis/io/v1alpha1" + "github.com/jaegertracing/jaeger-operator/pkg/storage" +) + +func init() { + viper.SetDefault("jaeger-version", "1.6") + viper.SetDefault("jaeger-agent-image", "jaegertracing/jaeger-agent") +} + +func TestCreateStreamingDeployment(t *testing.T) { + name := "TestCreateStreamingDeployment" + c := newStreamingStrategy(context.TODO(), v1alpha1.NewJaeger(name)) + objs := c.Create() + assertDeploymentsAndServicesForStreaming(t, name, objs, false, false, false) +} + +func TestCreateStreamingDeploymentOnOpenShift(t *testing.T) { + viper.Set("platform", "openshift") + defer viper.Reset() + name := "TestCreateStreamingDeploymentOnOpenShift" + + jaeger := v1alpha1.NewJaeger(name) + normalize(jaeger) + + c := newStreamingStrategy(context.TODO(), jaeger) + objs := c.Create() + assertDeploymentsAndServicesForStreaming(t, name, objs, false, true, false) +} + +func TestCreateStreamingDeploymentWithDaemonSetAgent(t *testing.T) { + name := "TestCreateStreamingDeploymentWithDaemonSetAgent" + + j := v1alpha1.NewJaeger(name) + j.Spec.Agent.Strategy = "DaemonSet" + + c := newStreamingStrategy(context.TODO(), j) + objs := c.Create() + assertDeploymentsAndServicesForStreaming(t, name, objs, true, false, false) +} + +func TestCreateStreamingDeploymentWithUIConfigMap(t *testing.T) { + name := "TestCreateStreamingDeploymentWithUIConfigMap" + + j := v1alpha1.NewJaeger(name) + j.Spec.UI.Options = v1alpha1.NewFreeForm(map[string]interface{}{ + "tracking": map[string]interface{}{ + "gaID": "UA-000000-2", + }, + }) + + c := newStreamingStrategy(context.TODO(), j) + objs := c.Create() + assertDeploymentsAndServicesForStreaming(t, name, objs, false, false, true) +} + +func TestUpdateStreamingDeployment(t *testing.T) { + name := "TestUpdateStreamingDeployment" + c := newStreamingStrategy(context.TODO(), v1alpha1.NewJaeger(name)) + assert.Len(t, c.Update(), 0) +} + +func TestStreamingOptionsArePassed(t *testing.T) { + jaeger := &v1alpha1.Jaeger{ + TypeMeta: metav1.TypeMeta{ + Kind: "Jaeger", + APIVersion: "io.jaegertracing/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "simple-prod", + Namespace: "simple-prod-ns", + }, + Spec: v1alpha1.JaegerSpec{ + Strategy: "streaming", + Ingester: v1alpha1.JaegerIngesterSpec{ + Options: v1alpha1.NewOptions(map[string]interface{}{ + "kafka.topic": "mytopic", + }), + }, + Storage: v1alpha1.JaegerStorageSpec{ + Type: "elasticsearch", + Options: v1alpha1.NewOptions(map[string]interface{}{ + "es.server-urls": "http://elasticsearch.default.svc:9200", + "es.username": "elastic", + "es.password": "changeme", + }), + }, + }, + } + + ctrl := For(context.TODO(), jaeger) + objs := ctrl.Create() + deployments := getDeployments(objs) + for _, dep := range deployments { + args := dep.Spec.Template.Spec.Containers[0].Args + // Only the query and ingester should have the ES parameters defined + var escount int + for _, arg := range args { + if strings.Contains(arg, "es.") { + escount++ + } + } + if strings.Contains(dep.Name, "collector") { + // Including parameters for sampling config and kafka topic + assert.Len(t, args, 2) + assert.Equal(t, 0, escount) + } else if strings.Contains(dep.Name, "ingester") { + // Including parameters for ES and kafka topic + assert.Len(t, args, 4) + assert.Equal(t, 3, escount) + + } else { + // Including parameters for ES only + assert.Len(t, args, 3) + assert.Equal(t, 3, escount) + } + } +} + +func TestDelegateStreamingDepedencies(t *testing.T) { + // for now, we just have storage dependencies + c := newStreamingStrategy(context.TODO(), v1alpha1.NewJaeger("TestDelegateStreamingDepedencies")) + assert.Equal(t, c.Dependencies(), storage.Dependencies(c.jaeger)) +} + +func assertDeploymentsAndServicesForStreaming(t *testing.T, name string, objs []runtime.Object, hasDaemonSet bool, hasOAuthProxy bool, hasConfigMap bool) { + expectedNumObjs := 6 + + if hasDaemonSet { + expectedNumObjs++ + } + + if hasOAuthProxy { + expectedNumObjs++ + } + + if hasConfigMap { + expectedNumObjs++ + } + + assert.Len(t, objs, expectedNumObjs) + + deployments := map[string]bool{ + fmt.Sprintf("%s-collector", name): false, + fmt.Sprintf("%s-query", name): false, + } + + daemonsets := map[string]bool{ + fmt.Sprintf("%s-agent-daemonset", name): !hasDaemonSet, + } + + services := map[string]bool{ + fmt.Sprintf("%s-collector", name): false, + fmt.Sprintf("%s-query", name): false, + } + + ingresses := map[string]bool{} + routes := map[string]bool{} + if viper.GetString("platform") == v1alpha1.FlagPlatformOpenShift { + routes[name] = false + } else { + ingresses[fmt.Sprintf("%s-query", name)] = false + } + + serviceAccounts := map[string]bool{} + if hasOAuthProxy { + serviceAccounts[fmt.Sprintf("%s-ui-proxy", name)] = false + } + + configMaps := map[string]bool{} + if hasConfigMap { + configMaps[fmt.Sprintf("%s-ui-configuration", name)] = false + } + assertHasAllObjects(t, name, objs, deployments, daemonsets, services, ingresses, routes, serviceAccounts, configMaps) +} + +func TestSparkDependenciesStreaming(t *testing.T) { + testSparkDependencies(t, func(jaeger *v1alpha1.Jaeger) S { + return &streamingStrategy{jaeger: jaeger} + }) +} + +func TestEsIndexClenarStreaming(t *testing.T) { + testEsIndexCleaner(t, func(jaeger *v1alpha1.Jaeger) S { + return &streamingStrategy{jaeger: jaeger} + }) +} From e91e86bd9894f7281d89241dcbe0939cb720b04b Mon Sep 17 00:00:00 2001 From: Gary Brown Date: Fri, 18 Jan 2019 11:12:34 +0000 Subject: [PATCH 04/14] Initial version of docs Signed-off-by: Gary Brown --- README.adoc | 50 ++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 48 insertions(+), 2 deletions(-) diff --git a/README.adoc b/README.adoc index 76c2db777..0166f9dc5 100644 --- a/README.adoc +++ b/README.adoc @@ -139,7 +139,7 @@ spec: annotations: scheduler.alpha.kubernetes.io/critical-pod: "" # <10> ---- -<1> The default strategy is `allInOne`. The only other possible value is `production`. +<1> The default strategy is `allInOne`. The only other possible values are `production` and `streaming`. <2> The image to use, in a regular Docker syntax <3> The (non-storage related) options to be passed verbatim to the underlying binary. Refer to the Jaeger documentation and/or to the `--help` option from the related binary for all the available options. <4> The option is a simple `key: value` map. In this case, we want the option `--log-level=debug` to be passed to the binary. @@ -150,6 +150,52 @@ spec: <9> By default, the operator assumes that agents are deployed as sidecars within the target pods. Specifying the strategy as "DaemonSet" changes that and makes the operator deploy the agent as DaemonSet. Note that your tracer client will probably have to override the "JAEGER_AGENT_HOST" env var to use the node's IP. <10> Define annotations to be applied to all deployments (not services). These can be overridden by annotations defined on the individual components. +== Strategies + +As shown in the example above, the Jaeger instance is associated with a strategy. The strategy determines the architecture to be used for the Jaeger backend. + +The available strategies are described in the following sections. + +=== AllInOne (Default) + +This strategy is intended for development, testing and demo purposes. + +The main backend components, agent, collector and query service, are all packaged into a single executable which is configured (by default) to use in-memory storage. + +=== Production + +The `production` strategy is intended (as the name suggests) for production environments, where long term storage of trace data is important, as well as a more scalable and highly available architecture is required. Each of the backend components is therefore separately deployed. + +The agent can be injected as a sidecar on the instrumented application or as a daemonset. + +The query and collector services are configured with a supported storage type - currently cassandra or elasticsearch. Multiple instances of each of these components can be provisions as required for performance and resilience purposes. + +The main additional requirement is to provide the details of the storage type and options, e.g. + +[source,yaml] +---- + storage: + type: elasticsearch + options: + es: + server-urls: http://elasticsearch:9200 +---- + +=== Streaming + +The `streaming` strategy is designed to augment the `production` strategy by providing a streaming capability that effectively sits between the collector and the backend storage (e.g. cassandra or elasticaearch). This provides the benefit of reducing the pressure on the backend storage, under high load situations, and enables other trace post processing capabilities to tap into the real time span data directly from the streaming platform (kafka). + +The only additional information required is to provide the details for accessing the Kafka platform, which is configured in a new `ingester` component: + +[source,yaml] +---- + ingester: + options: + kafka: + topic: jaeger-spans + brokers: 127.0.0.1:9092 +---- + == Accessing the UI === Kubernetes @@ -354,7 +400,7 @@ spec: datacenter: "datacenter3" mode: "test" ---- -<1> The same works for `production` +<1> The same works for `production` and `streaming` <2> These options are for the regular Jaeger components, like `collector` and `query` <3> The options for the `create-schema` job From d5bec5347ab3f424da4f44354029cb0e6f5bb410 Mon Sep 17 00:00:00 2001 From: Gary Brown Date: Fri, 18 Jan 2019 11:33:10 +0000 Subject: [PATCH 05/14] Fix readme based on review comments Signed-off-by: Gary Brown --- README.adoc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.adoc b/README.adoc index 0166f9dc5..892bb082e 100644 --- a/README.adoc +++ b/README.adoc @@ -168,7 +168,7 @@ The `production` strategy is intended (as the name suggests) for production envi The agent can be injected as a sidecar on the instrumented application or as a daemonset. -The query and collector services are configured with a supported storage type - currently cassandra or elasticsearch. Multiple instances of each of these components can be provisions as required for performance and resilience purposes. +The query and collector services are configured with a supported storage type - currently cassandra or elasticsearch. Multiple instances of each of these components can be provisioned as required for performance and resilience purposes. The main additional requirement is to provide the details of the storage type and options, e.g. @@ -183,7 +183,7 @@ The main additional requirement is to provide the details of the storage type an === Streaming -The `streaming` strategy is designed to augment the `production` strategy by providing a streaming capability that effectively sits between the collector and the backend storage (e.g. cassandra or elasticaearch). This provides the benefit of reducing the pressure on the backend storage, under high load situations, and enables other trace post processing capabilities to tap into the real time span data directly from the streaming platform (kafka). +The `streaming` strategy is designed to augment the `production` strategy by providing a streaming capability that effectively sits between the collector and the backend storage (e.g. cassandra or elasticsearch). This provides the benefit of reducing the pressure on the backend storage, under high load situations, and enables other trace post processing capabilities to tap into the real time span data directly from the streaming platform (kafka). The only additional information required is to provide the details for accessing the Kafka platform, which is configured in a new `ingester` component: From fb60b827dc6b9fadb00ea8c5bdeb5df8da7ec135 Mon Sep 17 00:00:00 2001 From: Gary Brown Date: Fri, 18 Jan 2019 15:04:54 +0000 Subject: [PATCH 06/14] Fix readiness port (although still seems to fail) and add example jaeger instance for streaming Signed-off-by: Gary Brown --- deploy/examples/simple-streaming.yaml | 18 ++++++++++++++++++ pkg/deployment/ingester.go | 11 +++++------ 2 files changed, 23 insertions(+), 6 deletions(-) create mode 100644 deploy/examples/simple-streaming.yaml diff --git a/deploy/examples/simple-streaming.yaml b/deploy/examples/simple-streaming.yaml new file mode 100644 index 000000000..b61f58cd2 --- /dev/null +++ b/deploy/examples/simple-streaming.yaml @@ -0,0 +1,18 @@ +# setup an elasticsearch with `make es` +# setup a kafka platform using https://strimzi.io with quickstart instructions +apiVersion: io.jaegertracing/v1alpha1 +kind: Jaeger +metadata: + name: simple-streaming +spec: + strategy: streaming + ingester: + options: + kafka: + topic: jaeger-spans + brokers: my-cluster-kafka-brokers.kafka:9092 + storage: + type: elasticsearch + options: + es: + server-urls: http://elasticsearch:9200 diff --git a/pkg/deployment/ingester.go b/pkg/deployment/ingester.go index 787ee6c09..5dff206fc 100644 --- a/pkg/deployment/ingester.go +++ b/pkg/deployment/ingester.go @@ -49,9 +49,8 @@ func (i *Ingester) Get() *appsv1.Deployment { baseCommonSpec := v1alpha1.JaegerCommonSpec{ Annotations: map[string]string{ - "prometheus.io/scrape": "true", - // TODO: Need to check if metrics port is exposed, and on what port number - "prometheus.io/port": "14268", + "prometheus.io/scrape": "true", + "prometheus.io/port": "2345", "sidecar.istio.io/inject": "false", }, } @@ -116,15 +115,15 @@ func (i *Ingester) Get() *appsv1.Deployment { EnvFrom: envFromSource, Ports: []v1.ContainerPort{ { - ContainerPort: 14267, - Name: "c-tchan-trft", + ContainerPort: 2345, + Name: "ingester-http", }, }, ReadinessProbe: &v1.Probe{ Handler: v1.Handler{ HTTPGet: &v1.HTTPGetAction{ Path: "/", - Port: intstr.FromInt(14267), + Port: intstr.FromInt(2345), }, }, InitialDelaySeconds: 1, From 225c86b82f6a5d733b0083e3bcd88dbd2056f2da Mon Sep 17 00:00:00 2001 From: Gary Brown Date: Fri, 18 Jan 2019 15:22:16 +0000 Subject: [PATCH 07/14] Fix health/metrics ports for ingester Signed-off-by: Gary Brown --- pkg/deployment/ingester.go | 12 ++++++++---- pkg/service/ingester.go | 8 ++++++-- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/pkg/deployment/ingester.go b/pkg/deployment/ingester.go index 5dff206fc..cdb267a02 100644 --- a/pkg/deployment/ingester.go +++ b/pkg/deployment/ingester.go @@ -50,7 +50,7 @@ func (i *Ingester) Get() *appsv1.Deployment { baseCommonSpec := v1alpha1.JaegerCommonSpec{ Annotations: map[string]string{ "prometheus.io/scrape": "true", - "prometheus.io/port": "2345", + "prometheus.io/port": "14271", "sidecar.istio.io/inject": "false", }, } @@ -115,15 +115,19 @@ func (i *Ingester) Get() *appsv1.Deployment { EnvFrom: envFromSource, Ports: []v1.ContainerPort{ { - ContainerPort: 2345, - Name: "ingester-http", + ContainerPort: 14270, + Name: "hc-http", + }, + { + ContainerPort: 14271, + Name: "metrics-http", }, }, ReadinessProbe: &v1.Probe{ Handler: v1.Handler{ HTTPGet: &v1.HTTPGetAction{ Path: "/", - Port: intstr.FromInt(2345), + Port: intstr.FromInt(14270), }, }, InitialDelaySeconds: 1, diff --git a/pkg/service/ingester.go b/pkg/service/ingester.go index ecbd46801..86f9de0e3 100644 --- a/pkg/service/ingester.go +++ b/pkg/service/ingester.go @@ -42,8 +42,12 @@ func NewIngesterService(jaeger *v1alpha1.Jaeger, selector map[string]string) *v1 ClusterIP: "None", Ports: []v1.ServicePort{ { - Name: "c-tchan-trft", - Port: 14267, + Name: "hc-http", + Port: 14270, + }, + { + Name: "metrics-http", + Port: 14271, }, }, }, From 21dc7e6fed14cba9a0485d3c871aa200e5df466c Mon Sep 17 00:00:00 2001 From: Gary Brown Date: Wed, 23 Jan 2019 14:11:01 +0000 Subject: [PATCH 08/14] Update labels inline with k8s recommendations Signed-off-by: Gary Brown --- pkg/deployment/ingester.go | 25 ++++++++++++++++++------- pkg/deployment/ingester_test.go | 10 ++++++++++ pkg/service/ingester_test.go | 3 ++- 3 files changed, 30 insertions(+), 8 deletions(-) diff --git a/pkg/deployment/ingester.go b/pkg/deployment/ingester.go index cdb267a02..6e81b031c 100644 --- a/pkg/deployment/ingester.go +++ b/pkg/deployment/ingester.go @@ -43,7 +43,7 @@ func (i *Ingester) Get() *appsv1.Deployment { logrus.Debugf("Assembling a ingester deployment for %v", i.jaeger) - selector := i.selector() + labels := i.labels() trueVar := true replicas := int32(i.jaeger.Spec.Ingester.Size) @@ -78,7 +78,7 @@ func (i *Ingester) Get() *appsv1.Deployment { Kind: "Deployment", }, ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%s-ingester", i.jaeger.Name), + Name: i.name(), Namespace: i.jaeger.Namespace, OwnerReferences: []metav1.OwnerReference{ metav1.OwnerReference{ @@ -93,11 +93,11 @@ func (i *Ingester) Get() *appsv1.Deployment { Spec: appsv1.DeploymentSpec{ Replicas: &replicas, Selector: &metav1.LabelSelector{ - MatchLabels: selector, + MatchLabels: labels, }, Template: v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ - Labels: selector, + Labels: labels, Annotations: commonSpec.Annotations, }, Spec: v1.PodSpec{ @@ -145,7 +145,7 @@ func (i *Ingester) Get() *appsv1.Deployment { func (i *Ingester) Services() []*v1.Service { services := []*v1.Service{} - service := service.NewIngesterService(i.jaeger, i.selector()) + service := service.NewIngesterService(i.jaeger, i.labels()) if service != nil { services = append(services, service) @@ -154,6 +154,17 @@ func (i *Ingester) Services() []*v1.Service { return services } -func (i *Ingester) selector() map[string]string { - return map[string]string{"app": "jaeger", "jaeger": i.jaeger.Name, "jaeger-component": "ingester"} +func (i *Ingester) labels() map[string]string { + return map[string]string{ + "app": "jaeger", // TODO(jpkroehling): see collector.go in this package + "app.kubernetes.io/name": i.name(), + "app.kubernetes.io/instance": i.jaeger.Name, + "app.kubernetes.io/component": "ingester", + "app.kubernetes.io/part-of": "jaeger", + "app.kubernetes.io/managed-by": "jaeger-operator", + } +} + +func (i *Ingester) name() string { + return fmt.Sprintf("%s-ingester", i.jaeger.Name) } diff --git a/pkg/deployment/ingester_test.go b/pkg/deployment/ingester_test.go index fdcf764ad..c18b2a494 100644 --- a/pkg/deployment/ingester_test.go +++ b/pkg/deployment/ingester_test.go @@ -1,6 +1,7 @@ package deployment import ( + "fmt" "testing" "github.com/spf13/viper" @@ -306,6 +307,15 @@ func TestIngesterWithStorageType(t *testing.T) { assert.Equal(t, "--kafka.brokers=http://brokers", dep.Spec.Template.Spec.Containers[0].Args[2]) } +func TestIngesterLabels(t *testing.T) { + ingester := NewIngester(newIngesterJaeger("TestIngesterLabels")) + dep := ingester.Get() + assert.Equal(t, "jaeger-operator", dep.Spec.Template.Labels["app.kubernetes.io/managed-by"]) + assert.Equal(t, "ingester", dep.Spec.Template.Labels["app.kubernetes.io/component"]) + assert.Equal(t, ingester.jaeger.Name, dep.Spec.Template.Labels["app.kubernetes.io/instance"]) + assert.Equal(t, fmt.Sprintf("%s-ingester", ingester.jaeger.Name), dep.Spec.Template.Labels["app.kubernetes.io/name"]) +} + func newIngesterJaeger(name string) *v1alpha1.Jaeger { return &v1alpha1.Jaeger{ ObjectMeta: metav1.ObjectMeta{ diff --git a/pkg/service/ingester_test.go b/pkg/service/ingester_test.go index e9b5fe020..3078e97d0 100644 --- a/pkg/service/ingester_test.go +++ b/pkg/service/ingester_test.go @@ -31,7 +31,8 @@ func TestIngesterServiceNameAndPorts(t *testing.T) { assert.Equal(t, svc.ObjectMeta.Name, fmt.Sprintf("%s-ingester", name)) ports := map[int32]bool{ - 14267: false, + 14270: false, + 14271: false, } for _, port := range svc.Spec.Ports { From 30dfdee055e8ca12c05f6b0be0bd56d869f17df4 Mon Sep 17 00:00:00 2001 From: Gary Brown Date: Wed, 23 Jan 2019 15:38:41 +0000 Subject: [PATCH 09/14] format fix Signed-off-by: Gary Brown --- pkg/deployment/ingester.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/deployment/ingester.go b/pkg/deployment/ingester.go index 6e81b031c..e1f201ca0 100644 --- a/pkg/deployment/ingester.go +++ b/pkg/deployment/ingester.go @@ -156,7 +156,7 @@ func (i *Ingester) Services() []*v1.Service { func (i *Ingester) labels() map[string]string { return map[string]string{ - "app": "jaeger", // TODO(jpkroehling): see collector.go in this package + "app": "jaeger", // TODO(jpkroehling): see collector.go in this package "app.kubernetes.io/name": i.name(), "app.kubernetes.io/instance": i.jaeger.Name, "app.kubernetes.io/component": "ingester", From 30fbeee2dcda870862c7a5c0a8d054854a58dc88 Mon Sep 17 00:00:00 2001 From: Gary Brown Date: Thu, 24 Jan 2019 17:23:09 +0000 Subject: [PATCH 10/14] Add debug logging and ingester deadlockInterval (disabled) to avoid restarts when just demo/testing Signed-off-by: Gary Brown --- deploy/examples/simple-streaming.yaml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/deploy/examples/simple-streaming.yaml b/deploy/examples/simple-streaming.yaml index b61f58cd2..88b4f2da1 100644 --- a/deploy/examples/simple-streaming.yaml +++ b/deploy/examples/simple-streaming.yaml @@ -6,11 +6,17 @@ metadata: name: simple-streaming spec: strategy: streaming + collector: + options: + log-level: debug ingester: options: kafka: topic: jaeger-spans brokers: my-cluster-kafka-brokers.kafka:9092 + ingester: + deadlockInterval: 0 + log-level: debug storage: type: elasticsearch options: From 26c1c9cf2ea2e661a8d91de735e06ceddcffcb41 Mon Sep 17 00:00:00 2001 From: Gary Brown Date: Fri, 25 Jan 2019 11:11:59 +0000 Subject: [PATCH 11/14] Remove ingester service as not required Signed-off-by: Gary Brown --- pkg/deployment/ingester.go | 17 +++---- pkg/deployment/ingester_test.go | 2 +- pkg/service/ingester.go | 60 ------------------------- pkg/service/ingester_test.go | 79 --------------------------------- 4 files changed, 6 insertions(+), 152 deletions(-) delete mode 100644 pkg/service/ingester.go delete mode 100644 pkg/service/ingester_test.go diff --git a/pkg/deployment/ingester.go b/pkg/deployment/ingester.go index e1f201ca0..177e5f5e0 100644 --- a/pkg/deployment/ingester.go +++ b/pkg/deployment/ingester.go @@ -12,7 +12,6 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" "github.com/jaegertracing/jaeger-operator/pkg/apis/io/v1alpha1" - "github.com/jaegertracing/jaeger-operator/pkg/service" "github.com/jaegertracing/jaeger-operator/pkg/storage" "github.com/jaegertracing/jaeger-operator/pkg/util" ) @@ -141,22 +140,16 @@ func (i *Ingester) Get() *appsv1.Deployment { } } -// Services returns a list of services to be deployed along with the ingesterdeployment +// Services returns a list of services to be deployed along with the ingester deployment func (i *Ingester) Services() []*v1.Service { - services := []*v1.Service{} - - service := service.NewIngesterService(i.jaeger, i.labels()) - - if service != nil { - services = append(services, service) - } - - return services + // Return empty list, as a service is not required for this deployment, which also + // simplifies switching between different strategies. + return []*v1.Service{} } func (i *Ingester) labels() map[string]string { return map[string]string{ - "app": "jaeger", // TODO(jpkroehling): see collector.go in this package + "app": "jaeger", // TODO(jpkroehling): see collector.go in this package "app.kubernetes.io/name": i.name(), "app.kubernetes.io/instance": i.jaeger.Name, "app.kubernetes.io/component": "ingester", diff --git a/pkg/deployment/ingester_test.go b/pkg/deployment/ingester_test.go index c18b2a494..465feae7d 100644 --- a/pkg/deployment/ingester_test.go +++ b/pkg/deployment/ingester_test.go @@ -54,7 +54,7 @@ func TestIngesterServices(t *testing.T) { jaeger := newIngesterJaeger("TestIngesterServices") ingester := NewIngester(jaeger) svcs := ingester.Services() - assert.Len(t, svcs, 1) + assert.Len(t, svcs, 0) } func TestDefaultIngesterImage(t *testing.T) { diff --git a/pkg/service/ingester.go b/pkg/service/ingester.go deleted file mode 100644 index 86f9de0e3..000000000 --- a/pkg/service/ingester.go +++ /dev/null @@ -1,60 +0,0 @@ -package service - -import ( - "fmt" - "strings" - - "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - - "github.com/jaegertracing/jaeger-operator/pkg/apis/io/v1alpha1" -) - -// NewIngesterService returns a new Kubernetes service for Jaeger Ingester backed by the pods matching the selector -func NewIngesterService(jaeger *v1alpha1.Jaeger, selector map[string]string) *v1.Service { - if !strings.EqualFold(jaeger.Spec.Strategy, "streaming") { - return nil - } - - trueVar := true - - return &v1.Service{ - TypeMeta: metav1.TypeMeta{ - Kind: "Service", - APIVersion: "v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: GetNameForIngesterService(jaeger), - Namespace: jaeger.Namespace, - Labels: selector, - OwnerReferences: []metav1.OwnerReference{ - metav1.OwnerReference{ - APIVersion: jaeger.APIVersion, - Kind: jaeger.Kind, - Name: jaeger.Name, - UID: jaeger.UID, - Controller: &trueVar, - }, - }, - }, - Spec: v1.ServiceSpec{ - Selector: selector, - ClusterIP: "None", - Ports: []v1.ServicePort{ - { - Name: "hc-http", - Port: 14270, - }, - { - Name: "metrics-http", - Port: 14271, - }, - }, - }, - } -} - -// GetNameForIngesterService returns the service name for the ingester in this Jaeger instance -func GetNameForIngesterService(jaeger *v1alpha1.Jaeger) string { - return fmt.Sprintf("%s-ingester", jaeger.Name) -} diff --git a/pkg/service/ingester_test.go b/pkg/service/ingester_test.go deleted file mode 100644 index 3078e97d0..000000000 --- a/pkg/service/ingester_test.go +++ /dev/null @@ -1,79 +0,0 @@ -package service - -import ( - "fmt" - "testing" - - "github.com/stretchr/testify/assert" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - - "github.com/jaegertracing/jaeger-operator/pkg/apis/io/v1alpha1" -) - -func TestIngesterServiceNameAndPorts(t *testing.T) { - name := "TestIngesterServiceNameAndPorts" - selector := map[string]string{"app": "myapp", "jaeger": name, "jaeger-component": "ingester"} - - jaeger := &v1alpha1.Jaeger{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - }, - Spec: v1alpha1.JaegerSpec{ - Strategy: "streaming", - Ingester: v1alpha1.JaegerIngesterSpec{ - Options: v1alpha1.NewOptions(map[string]interface{}{ - "any": "option", - }), - }, - }, - } - svc := NewIngesterService(jaeger, selector) - assert.Equal(t, svc.ObjectMeta.Name, fmt.Sprintf("%s-ingester", name)) - - ports := map[int32]bool{ - 14270: false, - 14271: false, - } - - for _, port := range svc.Spec.Ports { - ports[port.Port] = true - } - - for k, v := range ports { - assert.Equal(t, v, true, "Expected port %v to be specified, but wasn't", k) - } - -} - -func TestIngesterNoServiceWrongStrategy(t *testing.T) { - name := "TestIngesterNoServiceWrongStrategy" - selector := map[string]string{"app": "myapp", "jaeger": name, "jaeger-component": "ingester"} - - jaeger := &v1alpha1.Jaeger{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - }, - Spec: v1alpha1.JaegerSpec{ - Strategy: "production", - Ingester: v1alpha1.JaegerIngesterSpec{ - Options: v1alpha1.NewOptions(map[string]interface{}{ - "any": "option", - }), - }, - }, - } - assert.Nil(t, NewIngesterService(jaeger, selector)) -} - -func TestIngesterNoServiceMissingStrategy(t *testing.T) { - name := "TestIngesterNoServiceMissingStrategy" - selector := map[string]string{"app": "myapp", "jaeger": name, "jaeger-component": "ingester"} - - jaeger := &v1alpha1.Jaeger{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - }, - Spec: v1alpha1.JaegerSpec{}, - } - assert.Nil(t, NewIngesterService(jaeger, selector)) -} From cc48ba498c5faa5ebeddda668930a5dce84aa4ad Mon Sep 17 00:00:00 2001 From: Gary Brown Date: Fri, 25 Jan 2019 11:12:21 +0000 Subject: [PATCH 12/14] fix format Signed-off-by: Gary Brown --- pkg/deployment/ingester.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/deployment/ingester.go b/pkg/deployment/ingester.go index 177e5f5e0..251ca15b0 100644 --- a/pkg/deployment/ingester.go +++ b/pkg/deployment/ingester.go @@ -149,7 +149,7 @@ func (i *Ingester) Services() []*v1.Service { func (i *Ingester) labels() map[string]string { return map[string]string{ - "app": "jaeger", // TODO(jpkroehling): see collector.go in this package + "app": "jaeger", // TODO(jpkroehling): see collector.go in this package "app.kubernetes.io/name": i.name(), "app.kubernetes.io/instance": i.jaeger.Name, "app.kubernetes.io/component": "ingester", From cdf29eb7d9545c3c2c0e2630ffd2f66f5f1fabe2 Mon Sep 17 00:00:00 2001 From: Gary Brown Date: Fri, 25 Jan 2019 14:51:39 +0000 Subject: [PATCH 13/14] Address review comments Signed-off-by: Gary Brown --- README.adoc | 25 ++++++++++++++++++++----- pkg/deployment/collector.go | 2 +- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/README.adoc b/README.adoc index 892bb082e..2e5ba21c3 100644 --- a/README.adoc +++ b/README.adoc @@ -189,12 +189,27 @@ The only additional information required is to provide the details for accessing [source,yaml] ---- - ingester: - options: - kafka: - topic: jaeger-spans - brokers: 127.0.0.1:9092 +apiVersion: io.jaegertracing/v1alpha1 +kind: Jaeger +metadata: + name: simple-streaming +spec: + strategy: streaming + ingester: + options: + kafka: # <1> + topic: jaeger-spans + brokers: my-cluster-kafka-brokers.kafka:9092 + ingester: + deadlockInterval: 0 # <2> + storage: + type: elasticsearch + options: + es: + server-urls: http://elasticsearch:9200 ---- +<1> Identifies the kafka configuration used by the collector, to produce the messages, and the ingester to consume the messages +<2> The deadlock interval can be disabled to avoid the ingester being terminated when no messages arrive within the default 1 minute period == Accessing the UI diff --git a/pkg/deployment/collector.go b/pkg/deployment/collector.go index 1ba7b0f02..84ca66a0c 100644 --- a/pkg/deployment/collector.go +++ b/pkg/deployment/collector.go @@ -66,7 +66,7 @@ func (c *Collector) Get() *appsv1.Deployment { } storageType := c.jaeger.Spec.Storage.Type - // If sttategy is "streaming", then change storage type + // If strategy is "streaming", then change storage type // to Kafka, and the storage options will be used in the Ingester instead if strings.EqualFold(c.jaeger.Spec.Strategy, "streaming") { storageType = "kafka" From c21b0c2e57f2f06c1b51717c1e6aba99646f0c2d Mon Sep 17 00:00:00 2001 From: Gary Brown Date: Fri, 25 Jan 2019 15:40:57 +0000 Subject: [PATCH 14/14] Add TODO comment in streaming strategy Signed-off-by: Gary Brown --- pkg/strategy/streaming.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/strategy/streaming.go b/pkg/strategy/streaming.go index ac3f7240f..c9941802b 100644 --- a/pkg/strategy/streaming.go +++ b/pkg/strategy/streaming.go @@ -34,6 +34,8 @@ func newStreamingStrategy(ctx context.Context, jaeger *v1alpha1.Jaeger) *streami } func (c *streamingStrategy) Create() []runtime.Object { + // TODO: Look at ways to refactor this, with the production strategy Create(), to reuse + // common elements. collector := deployment.NewCollector(c.jaeger) query := deployment.NewQuery(c.jaeger) agent := deployment.NewAgent(c.jaeger)