Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[jaeger-v2] Add support for Cassandra #5253

Merged
merged 14 commits into from
Mar 16, 2024
41 changes: 41 additions & 0 deletions cmd/jaeger/cassandra_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
service:
extensions: [jaeger_storage, jaeger_query]
pipelines:
traces:
receivers: [otlp]
processors: [batch]
exporters: [jaeger_storage_exporter]

extensions:
jaeger_query:
trace_storage: cassandra_main
trace_storage_archive: cassandra_archive
ui_config: ./cmd/jaeger/config-ui.json

jaeger_storage:
cassandra:
cassandra_main:
servers: 127.0.0.1
port: 9042
cassandra_archive:
servers: 127.0.0.1
port: 9042
receivers:
otlp:
protocols:
grpc:
http:

jaeger:
protocols:
grpc:
thrift_binary:
thrift_compact:
thrift_http:

processors:
batch:

exporters:
jaeger_storage_exporter:
trace_storage: cassandra_main
10 changes: 6 additions & 4 deletions cmd/jaeger/internal/extension/jaegerstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"reflect"

cassandraCfg "github.com/jaegertracing/jaeger/pkg/cassandra/config"
esCfg "github.com/jaegertracing/jaeger/pkg/es/config"
memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config"
badgerCfg "github.com/jaegertracing/jaeger/plugin/storage/badger"
Expand All @@ -15,10 +16,11 @@ import (

// Config has the configuration for jaeger-query,
type Config struct {
Memory map[string]memoryCfg.Configuration `mapstructure:"memory"`
Badger map[string]badgerCfg.NamespaceConfig `mapstructure:"badger"`
GRPC map[string]grpcCfg.Configuration `mapstructure:"grpc"`
Elasticsearch map[string]esCfg.Configuration `mapstructure:"elasticsearch"`
Memory map[string]memoryCfg.Configuration `mapstructure:"memory"`
Badger map[string]badgerCfg.NamespaceConfig `mapstructure:"badger"`
GRPC map[string]grpcCfg.Configuration `mapstructure:"grpc"`
Elasticsearch map[string]esCfg.Configuration `mapstructure:"elasticsearch"`
Cassandra map[string]cassandraCfg.Configuration `mapstructure:"cassandra"`
// TODO add other storage types here
// TODO how will this work with 3rd party storage implementations?
// Option: instead of looking for specific name, check interface.
Expand Down
9 changes: 9 additions & 0 deletions cmd/jaeger/internal/extension/jaegerstorage/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ import (
"go.opentelemetry.io/collector/extension"
"go.uber.org/zap"

cassandraCfg "github.com/jaegertracing/jaeger/pkg/cassandra/config"
esCfg "github.com/jaegertracing/jaeger/pkg/es/config"
memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin/storage/badger"
badgerCfg "github.com/jaegertracing/jaeger/plugin/storage/badger"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
"github.com/jaegertracing/jaeger/plugin/storage/es"
"github.com/jaegertracing/jaeger/plugin/storage/grpc"
grpcCfg "github.com/jaegertracing/jaeger/plugin/storage/grpc/config"
Expand Down Expand Up @@ -128,12 +130,19 @@ func (s *storageExt) Start(ctx context.Context, host component.Host) error {
cfg: s.config.Elasticsearch,
builder: es.NewFactoryWithConfig,
}
cassandraStarter := &starter[cassandraCfg.Configuration, *cassandra.Factory]{
ext: s,
storageKind: "cassandra",
cfg: s.config.Cassandra,
builder: cassandra.NewFactoryWithConfig,
}

builders := []func(ctx context.Context, host component.Host) error{
memStarter.build,
badgerStarter.build,
grpcStarter.build,
esStarter.build,
cassandraStarter.build,
// TODO add support for other backends
}
for _, builder := range builders {
Expand Down
32 changes: 19 additions & 13 deletions pkg/cassandra/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"fmt"
"time"

"github.com/asaskevich/govalidator"
"github.com/gocql/gocql"
"go.uber.org/zap"

Expand All @@ -29,21 +30,21 @@

// Configuration describes the configuration properties needed to connect to a Cassandra cluster
type Configuration struct {
Servers []string `validate:"nonzero" mapstructure:"servers"`
Servers []string `validate:"nonzero" mapstructure:"servers" valid:"required,url"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we know what would be using the validate: tags? Maybe we should convert them to valid:?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

go-playground/validator is using the validate: tags. Since we are using asaskevich/govalidator, we should change this to valid:.

Keyspace string `validate:"nonzero" mapstructure:"keyspace"`
LocalDC string `yaml:"local_dc" mapstructure:"local_dc"`
ConnectionsPerHost int `validate:"min=1" yaml:"connections_per_host" mapstructure:"connections_per_host"`
LocalDC string `mapstructure:"local_dc"`
ConnectionsPerHost int `validate:"min=1" mapstructure:"connections_per_host"`
Timeout time.Duration `validate:"min=500" mapstructure:"-"`
ConnectTimeout time.Duration `yaml:"connect_timeout" mapstructure:"connection_timeout"`
ReconnectInterval time.Duration `validate:"min=500" yaml:"reconnect_interval" mapstructure:"reconnect_interval"`
SocketKeepAlive time.Duration `validate:"min=0" yaml:"socket_keep_alive" mapstructure:"socket_keep_alive"`
MaxRetryAttempts int `validate:"min=0" yaml:"max_retry_attempt" mapstructure:"max_retry_attempts"`
ProtoVersion int `yaml:"proto_version" mapstructure:"proto_version"`
Consistency string `yaml:"consistency" mapstructure:"consistency"`
DisableCompression bool `yaml:"disable-compression" mapstructure:"disable_compression"`
Port int `yaml:"port" mapstructure:"port"`
Authenticator Authenticator `yaml:"authenticator" mapstructure:",squash"`
DisableAutoDiscovery bool `yaml:"disable_auto_discovery" mapstructure:"-"`
ConnectTimeout time.Duration `mapstructure:"connection_timeout"`
ReconnectInterval time.Duration `validate:"min=500" mapstructure:"reconnect_interval"`
SocketKeepAlive time.Duration `validate:"min=0" mapstructure:"socket_keep_alive"`
MaxRetryAttempts int `validate:"min=0" mapstructure:"max_retry_attempts"`
ProtoVersion int `mapstructure:"proto_version"`
Consistency string `mapstructure:"consistency"`
DisableCompression bool `mapstructure:"disable_compression"`
Port int `mapstructure:"port"`
Authenticator Authenticator `mapstructure:",squash"`
DisableAutoDiscovery bool `mapstructure:"-"`
TLS tlscfg.Options `mapstructure:"tls"`
}

Expand Down Expand Up @@ -170,3 +171,8 @@
func (c *Configuration) String() string {
return fmt.Sprintf("%+v", *c)
}

func (c *Configuration) Validate() error {
_, err := govalidator.ValidateStruct(c)
return err

Check warning on line 177 in pkg/cassandra/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/cassandra/config/config.go#L175-L177

Added lines #L175 - L177 were not covered by tests
}
50 changes: 50 additions & 0 deletions plugin/storage/cassandra/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import (
"errors"
"flag"
"fmt"
"io"

"github.com/spf13/viper"
Expand Down Expand Up @@ -78,6 +79,55 @@
}
}

// NewFactoryWithConfig initializes factory with Config.
func NewFactoryWithConfig(
cfg config.Configuration,
metricsFactory metrics.Factory,
logger *zap.Logger,
) (*Factory, error) {
archive := make(map[string]*namespaceConfig)
serverURL, err := getServers(cfg.Servers)
if err != nil {
return nil, err
}
archive[archiveStorageConfig] = &namespaceConfig{
Configuration: cfg,
servers: serverURL,
namespace: archiveStorageConfig,
Enabled: true,

Check warning on line 97 in plugin/storage/cassandra/factory.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/cassandra/factory.go#L93-L97

Added lines #L93 - L97 were not covered by tests
}

f := NewFactory()
f.InitFromOptions(&Options{
Primary: namespaceConfig{
Configuration: cfg,
servers: serverURL,
namespace: primaryStorageConfig,
Enabled: true,
},
others: archive,
})
err = f.Initialize(metricsFactory, logger)
if err != nil {
return nil, err

Check warning on line 112 in plugin/storage/cassandra/factory.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/cassandra/factory.go#L100-L112

Added lines #L100 - L112 were not covered by tests
}
return f, nil

Check warning on line 114 in plugin/storage/cassandra/factory.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/cassandra/factory.go#L114

Added line #L114 was not covered by tests
}

func getServers(servers []string) (string, error) {
Pushkarm029 marked this conversation as resolved.
Show resolved Hide resolved
if len(servers) == 0 {
return "", fmt.Errorf("servers not found")
}
serverURL := servers[0]
for i, server := range servers {
if i == 0 {
continue

Check warning on line 124 in plugin/storage/cassandra/factory.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/cassandra/factory.go#L121-L124

Added lines #L121 - L124 were not covered by tests
}
serverURL = serverURL + ", " + server

Check warning on line 126 in plugin/storage/cassandra/factory.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/cassandra/factory.go#L126

Added line #L126 was not covered by tests
}
return serverURL, nil

Check warning on line 128 in plugin/storage/cassandra/factory.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/cassandra/factory.go#L128

Added line #L128 was not covered by tests
}

// AddFlags implements plugin.Configurable
func (f *Factory) AddFlags(flagSet *flag.FlagSet) {
f.Options.AddFlags(flagSet)
Expand Down
34 changes: 34 additions & 0 deletions plugin/storage/cassandra/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/cassandra"
cassandraCfg "github.com/jaegertracing/jaeger/pkg/cassandra/config"
"github.com/jaegertracing/jaeger/pkg/cassandra/mocks"
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/metrics"
Expand Down Expand Up @@ -197,3 +198,36 @@ func TestInitFromOptions(t *testing.T) {
assert.Equal(t, o.GetPrimary(), f.primaryConfig)
assert.Equal(t, o.Get(archiveStorageConfig), f.archiveConfig)
}

func TestConfigurationValidation(t *testing.T) {
testCases := []struct {
name string
cfg cassandraCfg.Configuration
wantErr bool
}{
{
name: "valid configuration",
cfg: cassandraCfg.Configuration{
Servers: []string{"http://localhost:9200"},
},
wantErr: false,
},
{
name: "missing servers",
cfg: cassandraCfg.Configuration{},
wantErr: true,
},
}
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
err := test.cfg.Validate()
if test.wantErr {
require.Error(t, err)
_, err = NewFactoryWithConfig(test.cfg, metrics.NullFactory, zap.NewNop())
require.Error(t, err)
} else {
require.NoError(t, err)
}
})
}
}
Loading