diff --git a/cmd/jaeger/cassandra_config.yaml b/cmd/jaeger/cassandra_config.yaml new file mode 100644 index 00000000000..86efdcfaf29 --- /dev/null +++ b/cmd/jaeger/cassandra_config.yaml @@ -0,0 +1,41 @@ +service: + extensions: [jaeger_storage, jaeger_query] + pipelines: + traces: + receivers: [otlp] + processors: [batch] + exporters: [jaeger_storage_exporter] + +extensions: + jaeger_query: + trace_storage: cassandra_main + trace_storage_archive: cassandra_archive + ui_config: ./cmd/jaeger/config-ui.json + + jaeger_storage: + cassandra: + cassandra_main: + servers: 127.0.0.1 + port: 9042 + cassandra_archive: + servers: 127.0.0.1 + port: 9042 +receivers: + otlp: + protocols: + grpc: + http: + + jaeger: + protocols: + grpc: + thrift_binary: + thrift_compact: + thrift_http: + +processors: + batch: + +exporters: + jaeger_storage_exporter: + trace_storage: cassandra_main \ No newline at end of file diff --git a/cmd/jaeger/internal/extension/jaegerstorage/config.go b/cmd/jaeger/internal/extension/jaegerstorage/config.go index 87464ef9311..44636000724 100644 --- a/cmd/jaeger/internal/extension/jaegerstorage/config.go +++ b/cmd/jaeger/internal/extension/jaegerstorage/config.go @@ -7,6 +7,7 @@ import ( "fmt" "reflect" + cassandraCfg "github.com/jaegertracing/jaeger/pkg/cassandra/config" esCfg "github.com/jaegertracing/jaeger/pkg/es/config" memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config" badgerCfg "github.com/jaegertracing/jaeger/plugin/storage/badger" @@ -15,10 +16,11 @@ import ( // Config has the configuration for jaeger-query, type Config struct { - Memory map[string]memoryCfg.Configuration `mapstructure:"memory"` - Badger map[string]badgerCfg.NamespaceConfig `mapstructure:"badger"` - GRPC map[string]grpcCfg.Configuration `mapstructure:"grpc"` - Elasticsearch map[string]esCfg.Configuration `mapstructure:"elasticsearch"` + Memory map[string]memoryCfg.Configuration `mapstructure:"memory"` + Badger map[string]badgerCfg.NamespaceConfig `mapstructure:"badger"` + GRPC map[string]grpcCfg.Configuration `mapstructure:"grpc"` + Elasticsearch map[string]esCfg.Configuration `mapstructure:"elasticsearch"` + Cassandra map[string]cassandraCfg.Configuration `mapstructure:"cassandra"` // TODO add other storage types here // TODO how will this work with 3rd party storage implementations? // Option: instead of looking for specific name, check interface. diff --git a/cmd/jaeger/internal/extension/jaegerstorage/extension.go b/cmd/jaeger/internal/extension/jaegerstorage/extension.go index 70275674e33..2f292ad030f 100644 --- a/cmd/jaeger/internal/extension/jaegerstorage/extension.go +++ b/cmd/jaeger/internal/extension/jaegerstorage/extension.go @@ -13,11 +13,13 @@ import ( "go.opentelemetry.io/collector/extension" "go.uber.org/zap" + cassandraCfg "github.com/jaegertracing/jaeger/pkg/cassandra/config" esCfg "github.com/jaegertracing/jaeger/pkg/es/config" memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config" "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/plugin/storage/badger" badgerCfg "github.com/jaegertracing/jaeger/plugin/storage/badger" + "github.com/jaegertracing/jaeger/plugin/storage/cassandra" "github.com/jaegertracing/jaeger/plugin/storage/es" "github.com/jaegertracing/jaeger/plugin/storage/grpc" grpcCfg "github.com/jaegertracing/jaeger/plugin/storage/grpc/config" @@ -128,12 +130,19 @@ func (s *storageExt) Start(ctx context.Context, host component.Host) error { cfg: s.config.Elasticsearch, builder: es.NewFactoryWithConfig, } + cassandraStarter := &starter[cassandraCfg.Configuration, *cassandra.Factory]{ + ext: s, + storageKind: "cassandra", + cfg: s.config.Cassandra, + builder: cassandra.NewFactoryWithConfig, + } builders := []func(ctx context.Context, host component.Host) error{ memStarter.build, badgerStarter.build, grpcStarter.build, esStarter.build, + cassandraStarter.build, // TODO add support for other backends } for _, builder := range builders { diff --git a/pkg/cassandra/config/config.go b/pkg/cassandra/config/config.go index e960af81490..a55ed402756 100644 --- a/pkg/cassandra/config/config.go +++ b/pkg/cassandra/config/config.go @@ -19,6 +19,7 @@ import ( "fmt" "time" + "github.com/asaskevich/govalidator" "github.com/gocql/gocql" "go.uber.org/zap" @@ -29,21 +30,21 @@ import ( // Configuration describes the configuration properties needed to connect to a Cassandra cluster type Configuration struct { - Servers []string `validate:"nonzero" mapstructure:"servers"` - Keyspace string `validate:"nonzero" mapstructure:"keyspace"` - LocalDC string `yaml:"local_dc" mapstructure:"local_dc"` - ConnectionsPerHost int `validate:"min=1" yaml:"connections_per_host" mapstructure:"connections_per_host"` - Timeout time.Duration `validate:"min=500" mapstructure:"-"` - ConnectTimeout time.Duration `yaml:"connect_timeout" mapstructure:"connection_timeout"` - ReconnectInterval time.Duration `validate:"min=500" yaml:"reconnect_interval" mapstructure:"reconnect_interval"` - SocketKeepAlive time.Duration `validate:"min=0" yaml:"socket_keep_alive" mapstructure:"socket_keep_alive"` - MaxRetryAttempts int `validate:"min=0" yaml:"max_retry_attempt" mapstructure:"max_retry_attempts"` - ProtoVersion int `yaml:"proto_version" mapstructure:"proto_version"` - Consistency string `yaml:"consistency" mapstructure:"consistency"` - DisableCompression bool `yaml:"disable-compression" mapstructure:"disable_compression"` - Port int `yaml:"port" mapstructure:"port"` - Authenticator Authenticator `yaml:"authenticator" mapstructure:",squash"` - DisableAutoDiscovery bool `yaml:"disable_auto_discovery" mapstructure:"-"` + Servers []string `valid:"required,url" mapstructure:"servers"` + Keyspace string `valid:"nonzero" mapstructure:"keyspace"` + LocalDC string `mapstructure:"local_dc"` + ConnectionsPerHost int `valid:"min=1" mapstructure:"connections_per_host"` + Timeout time.Duration `valid:"min=500" mapstructure:"-"` + ConnectTimeout time.Duration `mapstructure:"connection_timeout"` + ReconnectInterval time.Duration `valid:"min=500" mapstructure:"reconnect_interval"` + SocketKeepAlive time.Duration `valid:"min=0" mapstructure:"socket_keep_alive"` + MaxRetryAttempts int `valid:"min=0" mapstructure:"max_retry_attempts"` + ProtoVersion int `mapstructure:"proto_version"` + Consistency string `mapstructure:"consistency"` + DisableCompression bool `mapstructure:"disable_compression"` + Port int `mapstructure:"port"` + Authenticator Authenticator `mapstructure:",squash"` + DisableAutoDiscovery bool `mapstructure:"-"` TLS tlscfg.Options `mapstructure:"tls"` } @@ -170,3 +171,8 @@ func (c *Configuration) Close() error { func (c *Configuration) String() string { return fmt.Sprintf("%+v", *c) } + +func (c *Configuration) Validate() error { + _, err := govalidator.ValidateStruct(c) + return err +} diff --git a/plugin/storage/cassandra/factory.go b/plugin/storage/cassandra/factory.go index fed7bb77c85..dfa12fcb974 100644 --- a/plugin/storage/cassandra/factory.go +++ b/plugin/storage/cassandra/factory.go @@ -78,6 +78,24 @@ func NewFactory() *Factory { } } +// NewFactoryWithConfig initializes factory with Config. +func NewFactoryWithConfig( + cfg config.Configuration, + metricsFactory metrics.Factory, + logger *zap.Logger, +) (*Factory, error) { + if err := cfg.Validate(); err != nil { + return nil, err + } + f := NewFactory() + f.primaryConfig = &cfg + err := f.Initialize(metricsFactory, logger) + if err != nil { + return nil, err + } + return f, nil +} + // AddFlags implements plugin.Configurable func (f *Factory) AddFlags(flagSet *flag.FlagSet) { f.Options.AddFlags(flagSet) diff --git a/plugin/storage/cassandra/factory_test.go b/plugin/storage/cassandra/factory_test.go index 0a5a52e2bb8..97f1bf20acd 100644 --- a/plugin/storage/cassandra/factory_test.go +++ b/plugin/storage/cassandra/factory_test.go @@ -25,6 +25,7 @@ import ( "go.uber.org/zap" "github.com/jaegertracing/jaeger/pkg/cassandra" + cassandraCfg "github.com/jaegertracing/jaeger/pkg/cassandra/config" "github.com/jaegertracing/jaeger/pkg/cassandra/mocks" "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/pkg/metrics" @@ -197,3 +198,36 @@ func TestInitFromOptions(t *testing.T) { assert.Equal(t, o.GetPrimary(), f.primaryConfig) assert.Equal(t, o.Get(archiveStorageConfig), f.archiveConfig) } + +func TestConfigurationValidation(t *testing.T) { + testCases := []struct { + name string + cfg cassandraCfg.Configuration + wantErr bool + }{ + { + name: "valid configuration", + cfg: cassandraCfg.Configuration{ + Servers: []string{"http://localhost:9200"}, + }, + wantErr: false, + }, + { + name: "missing servers", + cfg: cassandraCfg.Configuration{}, + wantErr: true, + }, + } + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + err := test.cfg.Validate() + if test.wantErr { + require.Error(t, err) + _, err = NewFactoryWithConfig(test.cfg, metrics.NullFactory, zap.NewNop()) + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +}