Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ingester (and kafka) support #168

Merged
merged 14 commits into from
Jan 28, 2019
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 63 additions & 2 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ spec:
annotations:
scheduler.alpha.kubernetes.io/critical-pod: "" # <10>
----
<1> The default strategy is `allInOne`. The only other possible value is `production`.
<1> The default strategy is `allInOne`. The only other possible values are `production` and `streaming`.
<2> The image to use, in a regular Docker syntax
<3> The (non-storage related) options to be passed verbatim to the underlying binary. Refer to the Jaeger documentation and/or to the `--help` option from the related binary for all the available options.
<4> The option is a simple `key: value` map. In this case, we want the option `--log-level=debug` to be passed to the binary.
Expand All @@ -150,6 +150,67 @@ spec:
<9> By default, the operator assumes that agents are deployed as sidecars within the target pods. Specifying the strategy as "DaemonSet" changes that and makes the operator deploy the agent as DaemonSet. Note that your tracer client will probably have to override the "JAEGER_AGENT_HOST" env var to use the node's IP.
<10> Define annotations to be applied to all deployments (not services). These can be overridden by annotations defined on the individual components.

== Strategies

As shown in the example above, the Jaeger instance is associated with a strategy. The strategy determines the architecture to be used for the Jaeger backend.

The available strategies are described in the following sections.

=== AllInOne (Default)

This strategy is intended for development, testing and demo purposes.

The main backend components, agent, collector and query service, are all packaged into a single executable which is configured (by default) to use in-memory storage.

=== Production

The `production` strategy is intended (as the name suggests) for production environments, where long term storage of trace data is important, as well as a more scalable and highly available architecture is required. Each of the backend components is therefore separately deployed.

The agent can be injected as a sidecar on the instrumented application or as a daemonset.

The query and collector services are configured with a supported storage type - currently cassandra or elasticsearch. Multiple instances of each of these components can be provisioned as required for performance and resilience purposes.

The main additional requirement is to provide the details of the storage type and options, e.g.

[source,yaml]
----
storage:
type: elasticsearch
options:
es:
server-urls: http://elasticsearch:9200
----

=== Streaming

The `streaming` strategy is designed to augment the `production` strategy by providing a streaming capability that effectively sits between the collector and the backend storage (e.g. cassandra or elasticsearch). This provides the benefit of reducing the pressure on the backend storage, under high load situations, and enables other trace post processing capabilities to tap into the real time span data directly from the streaming platform (kafka).

The only additional information required is to provide the details for accessing the Kafka platform, which is configured in a new `ingester` component:

[source,yaml]
----
apiVersion: io.jaegertracing/v1alpha1
kind: Jaeger
metadata:
name: simple-streaming
spec:
strategy: streaming
ingester:
options:
kafka: # <1>
topic: jaeger-spans
brokers: my-cluster-kafka-brokers.kafka:9092
ingester:
deadlockInterval: 0 # <2>
storage:
type: elasticsearch
options:
es:
server-urls: http://elasticsearch:9200
----
<1> Identifies the kafka configuration used by the collector, to produce the messages, and the ingester to consume the messages
<2> The deadlock interval can be disabled to avoid the ingester being terminated when no messages arrive within the default 1 minute period

== Accessing the UI

=== Kubernetes
Expand Down Expand Up @@ -354,7 +415,7 @@ spec:
datacenter: "datacenter3"
mode: "test"
----
<1> The same works for `production`
<1> The same works for `production` and `streaming`
<2> These options are for the regular Jaeger components, like `collector` and `query`
<3> The options for the `create-schema` job

Expand Down
24 changes: 24 additions & 0 deletions deploy/examples/simple-streaming.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# setup an elasticsearch with `make es`
# setup a kafka platform using https://strimzi.io with quickstart instructions
apiVersion: io.jaegertracing/v1alpha1
kind: Jaeger
metadata:
name: simple-streaming
spec:
strategy: streaming
collector:
options:
log-level: debug
ingester:
options:
kafka:
topic: jaeger-spans
brokers: my-cluster-kafka-brokers.kafka:9092
ingester:
deadlockInterval: 0
log-level: debug
storage:
type: elasticsearch
options:
es:
server-urls: http://elasticsearch:9200
9 changes: 9 additions & 0 deletions pkg/apis/io/v1alpha1/jaeger_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type JaegerSpec struct {
AllInOne JaegerAllInOneSpec `json:"allInOne"`
Query JaegerQuerySpec `json:"query"`
Collector JaegerCollectorSpec `json:"collector"`
Ingester JaegerIngesterSpec `json:"ingester"`
Agent JaegerAgentSpec `json:"agent"`
UI JaegerUISpec `json:"ui"`
Sampling JaegerSamplingSpec `json:"sampling"`
Expand Down Expand Up @@ -111,6 +112,14 @@ type JaegerCollectorSpec struct {
JaegerCommonSpec
}

// JaegerIngesterSpec defines the options to be used when deploying the ingester
type JaegerIngesterSpec struct {
Size int `json:"size"`
Image string `json:"image"`
Options Options `json:"options"`
JaegerCommonSpec
}

// JaegerAgentSpec defines the options to be used when deploying the agent
type JaegerAgentSpec struct {
Strategy string `json:"strategy"` // can be either 'DaemonSet' or 'Sidecar' (default)
Expand Down
19 changes: 19 additions & 0 deletions pkg/apis/io/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions pkg/cmd/start/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ func NewStartCommand() *cobra.Command {
cmd.Flags().String("jaeger-collector-image", "jaegertracing/jaeger-collector", "The Docker image for the Jaeger Collector")
viper.BindPFlag("jaeger-collector-image", cmd.Flags().Lookup("jaeger-collector-image"))

cmd.Flags().String("jaeger-ingester-image", "jaegertracing/jaeger-ingester", "The Docker image for the Jaeger Ingester")
viper.BindPFlag("jaeger-ingester-image", cmd.Flags().Lookup("jaeger-ingester-image"))

cmd.Flags().String("jaeger-all-in-one-image", "jaegertracing/all-in-one", "The Docker image for the Jaeger all-in-one")
viper.BindPFlag("jaeger-all-in-one-image", cmd.Flags().Lookup("jaeger-all-in-one-image"))

Expand Down
24 changes: 12 additions & 12 deletions pkg/deployment/all-in-one_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ func TestDefaultAllInOneImage(t *testing.T) {
assert.Equal(t, "org/custom-all-in-one-image:123", d.Spec.Template.Spec.Containers[0].Image)

envvars := []v1.EnvVar{
v1.EnvVar{
{
Name: "SPAN_STORAGE_TYPE",
Value: "",
},
v1.EnvVar{
{
Name: "COLLECTOR_ZIPKIN_HTTP_PORT",
Value: "9411",
},
Expand Down Expand Up @@ -80,27 +80,27 @@ func TestAllInOneVolumeMountsWithVolumes(t *testing.T) {
name := "TestAllInOneVolumeMountsWithVolumes"

globalVolumes := []v1.Volume{
v1.Volume{
{
Name: "globalVolume",
VolumeSource: v1.VolumeSource{},
},
}

globalVolumeMounts := []v1.VolumeMount{
v1.VolumeMount{
{
Name: "globalVolume",
},
}

allInOneVolumes := []v1.Volume{
v1.Volume{
{
Name: "allInOneVolume",
VolumeSource: v1.VolumeSource{},
},
}

allInOneVolumeMounts := []v1.VolumeMount{
v1.VolumeMount{
{
Name: "allInOneVolume",
},
}
Expand Down Expand Up @@ -138,14 +138,14 @@ func TestAllInOneMountGlobalVolumes(t *testing.T) {
name := "TestAllInOneMountGlobalVolumes"

globalVolumes := []v1.Volume{
v1.Volume{
{
Name: "globalVolume",
VolumeSource: v1.VolumeSource{},
},
}

allInOneVolumeMounts := []v1.VolumeMount{
v1.VolumeMount{
{
Name: "globalVolume",
ReadOnly: true,
},
Expand All @@ -166,14 +166,14 @@ func TestAllInOneVolumeMountsWithSameName(t *testing.T) {
name := "TestAllInOneVolumeMountsWithSameName"

globalVolumeMounts := []v1.VolumeMount{
v1.VolumeMount{
{
Name: "data",
ReadOnly: true,
},
}

allInOneVolumeMounts := []v1.VolumeMount{
v1.VolumeMount{
{
Name: "data",
ReadOnly: false,
},
Expand All @@ -194,14 +194,14 @@ func TestAllInOneVolumeWithSameName(t *testing.T) {
name := "TestAllInOneVolumeWithSameName"

globalVolumes := []v1.Volume{
v1.Volume{
{
Name: "data",
VolumeSource: v1.VolumeSource{HostPath: &v1.HostPathVolumeSource{Path: "/data1"}},
},
}

allInOneVolumes := []v1.Volume{
v1.Volume{
{
Name: "data",
VolumeSource: v1.VolumeSource{HostPath: &v1.HostPathVolumeSource{Path: "/data2"}},
},
Expand Down
12 changes: 10 additions & 2 deletions pkg/deployment/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package deployment

import (
"fmt"
"strings"

"github.com/sirupsen/logrus"
"github.com/spf13/viper"
Expand Down Expand Up @@ -64,8 +65,15 @@ func (c *Collector) Get() *appsv1.Deployment {
})
}

storageType := c.jaeger.Spec.Storage.Type
// If strategy is "streaming", then change storage type
// to Kafka, and the storage options will be used in the Ingester instead
if strings.EqualFold(c.jaeger.Spec.Strategy, "streaming") {
storageType = "kafka"
}
options := allArgs(c.jaeger.Spec.Collector.Options,
c.jaeger.Spec.Storage.Options.Filter(storage.OptionsPrefix(c.jaeger.Spec.Storage.Type)))
c.jaeger.Spec.Storage.Options.Filter(storage.OptionsPrefix(storageType)),
c.jaeger.Spec.Ingester.Options.Filter(storage.OptionsPrefix(storageType)))

sampling.Update(c.jaeger, commonSpec, &options)

Expand Down Expand Up @@ -105,7 +113,7 @@ func (c *Collector) Get() *appsv1.Deployment {
Env: []v1.EnvVar{
v1.EnvVar{
Name: "SPAN_STORAGE_TYPE",
Value: c.jaeger.Spec.Storage.Type,
Value: storageType,
},
v1.EnvVar{
Name: "COLLECTOR_ZIPKIN_HTTP_PORT",
Expand Down
Loading