Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[jaeger-v2] Add support for Cassandra #5253

Merged
merged 14 commits into from
Mar 16, 2024
41 changes: 41 additions & 0 deletions cmd/jaeger/cassandra_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
service:
extensions: [jaeger_storage, jaeger_query]
pipelines:
traces:
receivers: [otlp]
processors: [batch]
exporters: [jaeger_storage_exporter]

extensions:
jaeger_query:
trace_storage: cassandra_main
trace_storage_archive: cassandra_archive
ui_config: ./cmd/jaeger/config-ui.json

jaeger_storage:
cassandra:
cassandra_main:
servers: 127.0.0.1
port: 9042
cassandra_archive:
servers: 127.0.0.1
port: 9042
receivers:
otlp:
protocols:
grpc:
http:

jaeger:
protocols:
grpc:
thrift_binary:
thrift_compact:
thrift_http:

processors:
batch:

exporters:
jaeger_storage_exporter:
trace_storage: cassandra_main
10 changes: 6 additions & 4 deletions cmd/jaeger/internal/extension/jaegerstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"reflect"

cassandraCfg "github.com/jaegertracing/jaeger/pkg/cassandra/config"
esCfg "github.com/jaegertracing/jaeger/pkg/es/config"
memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config"
badgerCfg "github.com/jaegertracing/jaeger/plugin/storage/badger"
Expand All @@ -15,10 +16,11 @@ import (

// Config has the configuration for jaeger-query,
type Config struct {
Memory map[string]memoryCfg.Configuration `mapstructure:"memory"`
Badger map[string]badgerCfg.NamespaceConfig `mapstructure:"badger"`
GRPC map[string]grpcCfg.Configuration `mapstructure:"grpc"`
Elasticsearch map[string]esCfg.Configuration `mapstructure:"elasticsearch"`
Memory map[string]memoryCfg.Configuration `mapstructure:"memory"`
Badger map[string]badgerCfg.NamespaceConfig `mapstructure:"badger"`
GRPC map[string]grpcCfg.Configuration `mapstructure:"grpc"`
Elasticsearch map[string]esCfg.Configuration `mapstructure:"elasticsearch"`
Cassandra map[string]cassandraCfg.Configuration `mapstructure:"cassandra"`
// TODO add other storage types here
// TODO how will this work with 3rd party storage implementations?
// Option: instead of looking for specific name, check interface.
Expand Down
9 changes: 9 additions & 0 deletions cmd/jaeger/internal/extension/jaegerstorage/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ import (
"go.opentelemetry.io/collector/extension"
"go.uber.org/zap"

cassandraCfg "github.com/jaegertracing/jaeger/pkg/cassandra/config"
esCfg "github.com/jaegertracing/jaeger/pkg/es/config"
memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin/storage/badger"
badgerCfg "github.com/jaegertracing/jaeger/plugin/storage/badger"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
"github.com/jaegertracing/jaeger/plugin/storage/es"
"github.com/jaegertracing/jaeger/plugin/storage/grpc"
grpcCfg "github.com/jaegertracing/jaeger/plugin/storage/grpc/config"
Expand Down Expand Up @@ -128,12 +130,19 @@ func (s *storageExt) Start(ctx context.Context, host component.Host) error {
cfg: s.config.Elasticsearch,
builder: es.NewFactoryWithConfig,
}
cassandraStarter := &starter[cassandraCfg.Configuration, *cassandra.Factory]{
ext: s,
storageKind: "cassandra",
cfg: s.config.Cassandra,
builder: cassandra.NewFactoryWithConfig,
}

builders := []func(ctx context.Context, host component.Host) error{
memStarter.build,
badgerStarter.build,
grpcStarter.build,
esStarter.build,
cassandraStarter.build,
// TODO add support for other backends
}
for _, builder := range builders {
Expand Down
36 changes: 21 additions & 15 deletions pkg/cassandra/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"fmt"
"time"

"github.com/asaskevich/govalidator"
"github.com/gocql/gocql"
"go.uber.org/zap"

Expand All @@ -29,21 +30,21 @@

// Configuration describes the configuration properties needed to connect to a Cassandra cluster
type Configuration struct {
Servers []string `validate:"nonzero" mapstructure:"servers"`
Keyspace string `validate:"nonzero" mapstructure:"keyspace"`
LocalDC string `yaml:"local_dc" mapstructure:"local_dc"`
ConnectionsPerHost int `validate:"min=1" yaml:"connections_per_host" mapstructure:"connections_per_host"`
Timeout time.Duration `validate:"min=500" mapstructure:"-"`
ConnectTimeout time.Duration `yaml:"connect_timeout" mapstructure:"connection_timeout"`
ReconnectInterval time.Duration `validate:"min=500" yaml:"reconnect_interval" mapstructure:"reconnect_interval"`
SocketKeepAlive time.Duration `validate:"min=0" yaml:"socket_keep_alive" mapstructure:"socket_keep_alive"`
MaxRetryAttempts int `validate:"min=0" yaml:"max_retry_attempt" mapstructure:"max_retry_attempts"`
ProtoVersion int `yaml:"proto_version" mapstructure:"proto_version"`
Consistency string `yaml:"consistency" mapstructure:"consistency"`
DisableCompression bool `yaml:"disable-compression" mapstructure:"disable_compression"`
Port int `yaml:"port" mapstructure:"port"`
Authenticator Authenticator `yaml:"authenticator" mapstructure:",squash"`
DisableAutoDiscovery bool `yaml:"disable_auto_discovery" mapstructure:"-"`
Servers []string `valid:"required,url" mapstructure:"servers"`
Keyspace string `valid:"nonzero" mapstructure:"keyspace"`
LocalDC string `mapstructure:"local_dc"`
ConnectionsPerHost int `valid:"min=1" mapstructure:"connections_per_host"`
Timeout time.Duration `valid:"min=500" mapstructure:"-"`
ConnectTimeout time.Duration `mapstructure:"connection_timeout"`
ReconnectInterval time.Duration `valid:"min=500" mapstructure:"reconnect_interval"`
SocketKeepAlive time.Duration `valid:"min=0" mapstructure:"socket_keep_alive"`
MaxRetryAttempts int `valid:"min=0" mapstructure:"max_retry_attempts"`
ProtoVersion int `mapstructure:"proto_version"`
Consistency string `mapstructure:"consistency"`
DisableCompression bool `mapstructure:"disable_compression"`
Port int `mapstructure:"port"`
Authenticator Authenticator `mapstructure:",squash"`
DisableAutoDiscovery bool `mapstructure:"-"`
TLS tlscfg.Options `mapstructure:"tls"`
}

Expand Down Expand Up @@ -170,3 +171,8 @@
func (c *Configuration) String() string {
return fmt.Sprintf("%+v", *c)
}

func (c *Configuration) Validate() error {
_, err := govalidator.ValidateStruct(c)
return err

Check warning on line 177 in pkg/cassandra/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/cassandra/config/config.go#L175-L177

Added lines #L175 - L177 were not covered by tests
}
18 changes: 18 additions & 0 deletions plugin/storage/cassandra/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,24 @@
}
}

// NewFactoryWithConfig initializes factory with Config.
func NewFactoryWithConfig(
cfg config.Configuration,
metricsFactory metrics.Factory,
logger *zap.Logger,
) (*Factory, error) {
if err := cfg.Validate(); err != nil {
return nil, err
}
f := NewFactory()
f.primaryConfig = &cfg
err := f.Initialize(metricsFactory, logger)
if err != nil {
return nil, err

Check warning on line 94 in plugin/storage/cassandra/factory.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/cassandra/factory.go#L90-L94

Added lines #L90 - L94 were not covered by tests
}
return f, nil

Check warning on line 96 in plugin/storage/cassandra/factory.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/cassandra/factory.go#L96

Added line #L96 was not covered by tests
}

// AddFlags implements plugin.Configurable
func (f *Factory) AddFlags(flagSet *flag.FlagSet) {
f.Options.AddFlags(flagSet)
Expand Down
34 changes: 34 additions & 0 deletions plugin/storage/cassandra/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/cassandra"
cassandraCfg "github.com/jaegertracing/jaeger/pkg/cassandra/config"
"github.com/jaegertracing/jaeger/pkg/cassandra/mocks"
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/metrics"
Expand Down Expand Up @@ -197,3 +198,36 @@ func TestInitFromOptions(t *testing.T) {
assert.Equal(t, o.GetPrimary(), f.primaryConfig)
assert.Equal(t, o.Get(archiveStorageConfig), f.archiveConfig)
}

func TestConfigurationValidation(t *testing.T) {
testCases := []struct {
name string
cfg cassandraCfg.Configuration
wantErr bool
}{
{
name: "valid configuration",
cfg: cassandraCfg.Configuration{
Servers: []string{"http://localhost:9200"},
},
wantErr: false,
},
{
name: "missing servers",
cfg: cassandraCfg.Configuration{},
wantErr: true,
},
}
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
err := test.cfg.Validate()
if test.wantErr {
require.Error(t, err)
_, err = NewFactoryWithConfig(test.cfg, metrics.NullFactory, zap.NewNop())
require.Error(t, err)
} else {
require.NoError(t, err)
}
})
}
}
Loading