Skip to content

Commit

Permalink
[jaeger-v2] Add support for Cassandra (#5253)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
- part of #4843 

## Description of the changes
- add support for cassandra

## How was this change tested?
- currently, tests are failing.

## Checklist
- [X] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [X] I have signed all commits
- [X] I have added unit tests for the new functionality
- [X] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `yarn lint` and `yarn test`

---------

Signed-off-by: Pushkar Mishra <[email protected]>
  • Loading branch information
Pushkarm029 authored Mar 16, 2024
1 parent 990c4e1 commit 3912f00
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 19 deletions.
41 changes: 41 additions & 0 deletions cmd/jaeger/cassandra_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
service:
extensions: [jaeger_storage, jaeger_query]
pipelines:
traces:
receivers: [otlp]
processors: [batch]
exporters: [jaeger_storage_exporter]

extensions:
jaeger_query:
trace_storage: cassandra_main
trace_storage_archive: cassandra_archive
ui_config: ./cmd/jaeger/config-ui.json

jaeger_storage:
cassandra:
cassandra_main:
servers: 127.0.0.1
port: 9042
cassandra_archive:
servers: 127.0.0.1
port: 9042
receivers:
otlp:
protocols:
grpc:
http:

jaeger:
protocols:
grpc:
thrift_binary:
thrift_compact:
thrift_http:

processors:
batch:

exporters:
jaeger_storage_exporter:
trace_storage: cassandra_main
10 changes: 6 additions & 4 deletions cmd/jaeger/internal/extension/jaegerstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"reflect"

cassandraCfg "github.com/jaegertracing/jaeger/pkg/cassandra/config"
esCfg "github.com/jaegertracing/jaeger/pkg/es/config"
memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config"
badgerCfg "github.com/jaegertracing/jaeger/plugin/storage/badger"
Expand All @@ -15,10 +16,11 @@ import (

// Config has the configuration for jaeger-query,
type Config struct {
Memory map[string]memoryCfg.Configuration `mapstructure:"memory"`
Badger map[string]badgerCfg.NamespaceConfig `mapstructure:"badger"`
GRPC map[string]grpcCfg.Configuration `mapstructure:"grpc"`
Elasticsearch map[string]esCfg.Configuration `mapstructure:"elasticsearch"`
Memory map[string]memoryCfg.Configuration `mapstructure:"memory"`
Badger map[string]badgerCfg.NamespaceConfig `mapstructure:"badger"`
GRPC map[string]grpcCfg.Configuration `mapstructure:"grpc"`
Elasticsearch map[string]esCfg.Configuration `mapstructure:"elasticsearch"`
Cassandra map[string]cassandraCfg.Configuration `mapstructure:"cassandra"`
// TODO add other storage types here
// TODO how will this work with 3rd party storage implementations?
// Option: instead of looking for specific name, check interface.
Expand Down
9 changes: 9 additions & 0 deletions cmd/jaeger/internal/extension/jaegerstorage/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ import (
"go.opentelemetry.io/collector/extension"
"go.uber.org/zap"

cassandraCfg "github.com/jaegertracing/jaeger/pkg/cassandra/config"
esCfg "github.com/jaegertracing/jaeger/pkg/es/config"
memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin/storage/badger"
badgerCfg "github.com/jaegertracing/jaeger/plugin/storage/badger"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
"github.com/jaegertracing/jaeger/plugin/storage/es"
"github.com/jaegertracing/jaeger/plugin/storage/grpc"
grpcCfg "github.com/jaegertracing/jaeger/plugin/storage/grpc/config"
Expand Down Expand Up @@ -128,12 +130,19 @@ func (s *storageExt) Start(ctx context.Context, host component.Host) error {
cfg: s.config.Elasticsearch,
builder: es.NewFactoryWithConfig,
}
cassandraStarter := &starter[cassandraCfg.Configuration, *cassandra.Factory]{
ext: s,
storageKind: "cassandra",
cfg: s.config.Cassandra,
builder: cassandra.NewFactoryWithConfig,
}

builders := []func(ctx context.Context, host component.Host) error{
memStarter.build,
badgerStarter.build,
grpcStarter.build,
esStarter.build,
cassandraStarter.build,
// TODO add support for other backends
}
for _, builder := range builders {
Expand Down
36 changes: 21 additions & 15 deletions pkg/cassandra/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"time"

"github.com/asaskevich/govalidator"
"github.com/gocql/gocql"
"go.uber.org/zap"

Expand All @@ -29,21 +30,21 @@ import (

// Configuration describes the configuration properties needed to connect to a Cassandra cluster
type Configuration struct {
Servers []string `validate:"nonzero" mapstructure:"servers"`
Keyspace string `validate:"nonzero" mapstructure:"keyspace"`
LocalDC string `yaml:"local_dc" mapstructure:"local_dc"`
ConnectionsPerHost int `validate:"min=1" yaml:"connections_per_host" mapstructure:"connections_per_host"`
Timeout time.Duration `validate:"min=500" mapstructure:"-"`
ConnectTimeout time.Duration `yaml:"connect_timeout" mapstructure:"connection_timeout"`
ReconnectInterval time.Duration `validate:"min=500" yaml:"reconnect_interval" mapstructure:"reconnect_interval"`
SocketKeepAlive time.Duration `validate:"min=0" yaml:"socket_keep_alive" mapstructure:"socket_keep_alive"`
MaxRetryAttempts int `validate:"min=0" yaml:"max_retry_attempt" mapstructure:"max_retry_attempts"`
ProtoVersion int `yaml:"proto_version" mapstructure:"proto_version"`
Consistency string `yaml:"consistency" mapstructure:"consistency"`
DisableCompression bool `yaml:"disable-compression" mapstructure:"disable_compression"`
Port int `yaml:"port" mapstructure:"port"`
Authenticator Authenticator `yaml:"authenticator" mapstructure:",squash"`
DisableAutoDiscovery bool `yaml:"disable_auto_discovery" mapstructure:"-"`
Servers []string `valid:"required,url" mapstructure:"servers"`
Keyspace string `valid:"nonzero" mapstructure:"keyspace"`
LocalDC string `mapstructure:"local_dc"`
ConnectionsPerHost int `valid:"min=1" mapstructure:"connections_per_host"`
Timeout time.Duration `valid:"min=500" mapstructure:"-"`
ConnectTimeout time.Duration `mapstructure:"connection_timeout"`
ReconnectInterval time.Duration `valid:"min=500" mapstructure:"reconnect_interval"`
SocketKeepAlive time.Duration `valid:"min=0" mapstructure:"socket_keep_alive"`
MaxRetryAttempts int `valid:"min=0" mapstructure:"max_retry_attempts"`
ProtoVersion int `mapstructure:"proto_version"`
Consistency string `mapstructure:"consistency"`
DisableCompression bool `mapstructure:"disable_compression"`
Port int `mapstructure:"port"`
Authenticator Authenticator `mapstructure:",squash"`
DisableAutoDiscovery bool `mapstructure:"-"`
TLS tlscfg.Options `mapstructure:"tls"`
}

Expand Down Expand Up @@ -170,3 +171,8 @@ func (c *Configuration) Close() error {
func (c *Configuration) String() string {
return fmt.Sprintf("%+v", *c)
}

func (c *Configuration) Validate() error {
_, err := govalidator.ValidateStruct(c)
return err
}
18 changes: 18 additions & 0 deletions plugin/storage/cassandra/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,24 @@ func NewFactory() *Factory {
}
}

// NewFactoryWithConfig initializes factory with Config.
func NewFactoryWithConfig(
cfg config.Configuration,
metricsFactory metrics.Factory,
logger *zap.Logger,
) (*Factory, error) {
if err := cfg.Validate(); err != nil {
return nil, err
}
f := NewFactory()
f.primaryConfig = &cfg
err := f.Initialize(metricsFactory, logger)
if err != nil {
return nil, err
}
return f, nil
}

// AddFlags implements plugin.Configurable
func (f *Factory) AddFlags(flagSet *flag.FlagSet) {
f.Options.AddFlags(flagSet)
Expand Down
34 changes: 34 additions & 0 deletions plugin/storage/cassandra/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/cassandra"
cassandraCfg "github.com/jaegertracing/jaeger/pkg/cassandra/config"
"github.com/jaegertracing/jaeger/pkg/cassandra/mocks"
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/metrics"
Expand Down Expand Up @@ -197,3 +198,36 @@ func TestInitFromOptions(t *testing.T) {
assert.Equal(t, o.GetPrimary(), f.primaryConfig)
assert.Equal(t, o.Get(archiveStorageConfig), f.archiveConfig)
}

func TestConfigurationValidation(t *testing.T) {
testCases := []struct {
name string
cfg cassandraCfg.Configuration
wantErr bool
}{
{
name: "valid configuration",
cfg: cassandraCfg.Configuration{
Servers: []string{"http://localhost:9200"},
},
wantErr: false,
},
{
name: "missing servers",
cfg: cassandraCfg.Configuration{},
wantErr: true,
},
}
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
err := test.cfg.Validate()
if test.wantErr {
require.Error(t, err)
_, err = NewFactoryWithConfig(test.cfg, metrics.NullFactory, zap.NewNop())
require.Error(t, err)
} else {
require.NoError(t, err)
}
})
}
}

0 comments on commit 3912f00

Please sign in to comment.