From e9fac0575b5cc3b5666b6432f25759d6e09dfd0d Mon Sep 17 00:00:00 2001 From: Alok Kumar Singh <62210712+akstron@users.noreply.github.com> Date: Sat, 30 Nov 2024 11:46:50 -0800 Subject: [PATCH] Create Cassandra db schema on session initialization (#5922) Create Schema (if not present) on Session Initialization Once a session is established with cassandra db, the added code parses the template file containing queries for creating schema and create queries out of it. Post which it executes those queries to create the required types and tables. ## Which problem is this PR solving? Resolves #5797 ## Description of the changes - The PR includes the following changes: - 1. Embedding template files into binary - 2. Creation of database schema in initialization steps once session to database is established. ## How was this change tested? - Schema rendering is being tested with unit test. - bash scripts/cassandra-integration-test.sh -s 4 v004 v2 ## Checklist - [x] I have read https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md - [x] I have signed all commits - [x] I have added unit tests for the new functionality - [x] I have run lint and test steps successfully - for `jaeger`: `make lint test` - for `jaeger-ui`: `yarn lint` and `yarn test` --------- Signed-off-by: Alok Kumar Singh Signed-off-by: Alok Kumar Singh <62210712+akstron@users.noreply.github.com> Signed-off-by: Yuri Shkuro Signed-off-by: Yuri Shkuro Co-authored-by: Yuri Shkuro Co-authored-by: Yuri Shkuro --- .github/workflows/ci-e2e-cassandra.yml | 11 +- cmd/jaeger/config-cassandra.yaml | 4 +- pkg/cassandra/config/config.go | 86 ++++++-- pkg/cassandra/config/config_test.go | 24 +++ plugin/storage/cassandra/factory.go | 65 +++++- plugin/storage/cassandra/factory_test.go | 119 ++++++----- plugin/storage/cassandra/options.go | 4 +- .../storage/cassandra/savetracetest/main.go | 3 +- .../storage/cassandra/schema/package_test.go | 14 ++ plugin/storage/cassandra/schema/schema.go | 154 ++++++++++++++ .../storage/cassandra/schema/schema_test.go | 59 ++++++ .../cassandra/schema/v004-go-tmpl.cql.tmpl | 200 ++++++++++++++++++ scripts/cassandra-integration-test.sh | 9 +- 13 files changed, 662 insertions(+), 90 deletions(-) create mode 100644 plugin/storage/cassandra/schema/package_test.go create mode 100644 plugin/storage/cassandra/schema/schema.go create mode 100644 plugin/storage/cassandra/schema/schema_test.go create mode 100644 plugin/storage/cassandra/schema/v004-go-tmpl.cql.tmpl diff --git a/.github/workflows/ci-e2e-cassandra.yml b/.github/workflows/ci-e2e-cassandra.yml index 72a930156c9..3ca6709e012 100644 --- a/.github/workflows/ci-e2e-cassandra.yml +++ b/.github/workflows/ci-e2e-cassandra.yml @@ -22,6 +22,7 @@ jobs: fail-fast: false matrix: jaeger-version: [v1, v2] + create-schema: [manual, auto] version: - distribution: cassandra major: 4.x @@ -29,7 +30,11 @@ jobs: - distribution: cassandra major: 5.x schema: v004 - name: ${{ matrix.version.distribution }} ${{ matrix.version.major }} ${{ matrix.jaeger-version }} + exclude: + # Exclude v1 as create schema on fly is available for v2 only + - jaeger-version: v1 + create-schema: auto + name: ${{ matrix.version.distribution }}-${{ matrix.version.major }} ${{ matrix.jaeger-version }} schema=${{ matrix.create-schema }} steps: - name: Harden Runner uses: step-security/harden-runner@91182cccc01eb5e619899d80e4e971d6181294a7 # v2.10.1 @@ -45,9 +50,11 @@ jobs: - name: Run cassandra integration tests id: test-execution run: bash scripts/cassandra-integration-test.sh ${{ matrix.version.major }} ${{ matrix.version.schema }} ${{ matrix.jaeger-version }} + env: + SKIP_APPLY_SCHEMA: ${{ matrix.create-schema == 'auto' && true || false }} - name: Upload coverage to codecov uses: ./.github/actions/upload-codecov with: files: cover.out - flags: cassandra-${{ matrix.version.major }}-${{ matrix.jaeger-version }} + flags: cassandra-${{ matrix.version.major }}-${{ matrix.jaeger-version }}-${{ matrix.create-schema }} diff --git a/cmd/jaeger/config-cassandra.yaml b/cmd/jaeger/config-cassandra.yaml index 0b7550535da..4b55737f624 100644 --- a/cmd/jaeger/config-cassandra.yaml +++ b/cmd/jaeger/config-cassandra.yaml @@ -34,6 +34,7 @@ extensions: cassandra: schema: keyspace: "jaeger_v1_dc1" + create: "${env:CASSANDRA_CREATE_SCHEMA:-true}" connection: auth: basic: @@ -44,7 +45,8 @@ extensions: another_storage: cassandra: schema: - keyspace: "jaeger_v1_dc1" + keyspace: "jaeger_v1_dc1_archive" + create: "${env:CASSANDRA_CREATE_SCHEMA:-true}" connection: auth: basic: diff --git a/pkg/cassandra/config/config.go b/pkg/cassandra/config/config.go index 6bab9c75da4..ea4e839a519 100644 --- a/pkg/cassandra/config/config.go +++ b/pkg/cassandra/config/config.go @@ -6,15 +6,13 @@ package config import ( "context" + "errors" "fmt" "time" "github.com/asaskevich/govalidator" "github.com/gocql/gocql" "go.opentelemetry.io/collector/config/configtls" - - "github.com/jaegertracing/jaeger/pkg/cassandra" - gocqlw "github.com/jaegertracing/jaeger/pkg/cassandra/gocql" ) // Configuration describes the configuration properties needed to connect to a Cassandra cluster. @@ -58,6 +56,19 @@ type Schema struct { // while connecting to the Cassandra Cluster. This is useful for connecting to clusters, like Azure Cosmos DB, // that do not support SnappyCompression. DisableCompression bool `mapstructure:"disable_compression"` + // CreateSchema tells if the schema ahould be created during session initialization based on the configs provided + CreateSchema bool `mapstructure:"create" valid:"optional"` + // Datacenter is the name for network topology + Datacenter string `mapstructure:"datacenter" valid:"optional"` + // TraceTTL is Time To Live (TTL) for the trace data. Should at least be 1 second + TraceTTL time.Duration `mapstructure:"trace_ttl" valid:"optional"` + // DependenciesTTL is Time To Live (TTL) for dependencies data. Should at least be 1 second + DependenciesTTL time.Duration `mapstructure:"dependencies_ttl" valid:"optional"` + // Replication factor for the db + ReplicationFactor int `mapstructure:"replication_factor" valid:"optional"` + // CompactionWindow is the size of the window for TimeWindowCompactionStrategy. + // All SSTables within that window are grouped together into one SSTable. + CompactionWindow time.Duration `mapstructure:"compaction_window" valid:"optional"` } type Query struct { @@ -86,7 +97,13 @@ type BasicAuthenticator struct { func DefaultConfiguration() Configuration { return Configuration{ Schema: Schema{ - Keyspace: "jaeger_v1_test", + CreateSchema: false, + Keyspace: "jaeger_dc1", + Datacenter: "dc1", + TraceTTL: 2 * 24 * time.Hour, + DependenciesTTL: 2 * 24 * time.Hour, + ReplicationFactor: 1, + CompactionWindow: time.Minute, }, Connection: Connection{ Servers: []string{"127.0.0.1"}, @@ -106,6 +123,27 @@ func (c *Configuration) ApplyDefaults(source *Configuration) { if c.Schema.Keyspace == "" { c.Schema.Keyspace = source.Schema.Keyspace } + + if c.Schema.Datacenter == "" { + c.Schema.Datacenter = source.Schema.Datacenter + } + + if c.Schema.TraceTTL == 0 { + c.Schema.TraceTTL = source.Schema.TraceTTL + } + + if c.Schema.DependenciesTTL == 0 { + c.Schema.DependenciesTTL = source.Schema.DependenciesTTL + } + + if c.Schema.ReplicationFactor == 0 { + c.Schema.ReplicationFactor = source.Schema.ReplicationFactor + } + + if c.Schema.CompactionWindow == 0 { + c.Schema.CompactionWindow = source.Schema.CompactionWindow + } + if c.Connection.ConnectionsPerHost == 0 { c.Connection.ConnectionsPerHost = source.Connection.ConnectionsPerHost } @@ -129,24 +167,6 @@ func (c *Configuration) ApplyDefaults(source *Configuration) { } } -// SessionBuilder creates new cassandra.Session -type SessionBuilder interface { - NewSession() (cassandra.Session, error) -} - -// NewSession creates a new Cassandra session -func (c *Configuration) NewSession() (cassandra.Session, error) { - cluster, err := c.NewCluster() - if err != nil { - return nil, err - } - session, err := cluster.CreateSession() - if err != nil { - return nil, err - } - return gocqlw.WrapCQLSession(session), nil -} - // NewCluster creates a new gocql cluster from the configuration func (c *Configuration) NewCluster() (*gocql.ClusterConfig, error) { cluster := gocql.NewCluster(c.Connection.Servers...) @@ -210,7 +230,27 @@ func (c *Configuration) String() string { return fmt.Sprintf("%+v", *c) } +func isValidTTL(duration time.Duration) bool { + return duration == 0 || duration >= time.Second +} + func (c *Configuration) Validate() error { _, err := govalidator.ValidateStruct(c) - return err + if err != nil { + return err + } + + if !isValidTTL(c.Schema.TraceTTL) { + return errors.New("trace_ttl can either be 0 or greater than or equal to 1 second") + } + + if !isValidTTL(c.Schema.DependenciesTTL) { + return errors.New("dependencies_ttl can either be 0 or greater than or equal to 1 second") + } + + if c.Schema.CompactionWindow < time.Minute { + return errors.New("compaction_window should at least be 1 minute") + } + + return nil } diff --git a/pkg/cassandra/config/config_test.go b/pkg/cassandra/config/config_test.go index 8d532f65c60..f1de4f6d55b 100644 --- a/pkg/cassandra/config/config_test.go +++ b/pkg/cassandra/config/config_test.go @@ -5,6 +5,7 @@ package config import ( "testing" + "time" "github.com/gocql/gocql" "github.com/stretchr/testify/assert" @@ -43,6 +44,9 @@ func TestValidate_DoesNotReturnErrorWhenRequiredFieldsSet(t *testing.T) { Connection: Connection{ Servers: []string{"localhost:9200"}, }, + Schema: Schema{ + CompactionWindow: time.Minute, + }, } err := cfg.Validate() @@ -94,3 +98,23 @@ func TestToString(t *testing.T) { s := cfg.String() assert.Contains(t, s, "Keyspace:test") } + +func TestConfigSchemaValidation(t *testing.T) { + cfg := DefaultConfiguration() + err := cfg.Validate() + require.NoError(t, err) + + cfg.Schema.TraceTTL = time.Millisecond + err = cfg.Validate() + require.Error(t, err) + + cfg.Schema.TraceTTL = time.Second + cfg.Schema.CompactionWindow = time.Minute - 1 + err = cfg.Validate() + require.Error(t, err) + + cfg.Schema.CompactionWindow = time.Minute + cfg.Schema.DependenciesTTL = time.Second - 1 + err = cfg.Validate() + require.Error(t, err) +} diff --git a/plugin/storage/cassandra/factory.go b/plugin/storage/cassandra/factory.go index 4654a2a2c82..49c7a2a9c55 100644 --- a/plugin/storage/cassandra/factory.go +++ b/plugin/storage/cassandra/factory.go @@ -17,6 +17,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/cassandra" "github.com/jaegertracing/jaeger/pkg/cassandra/config" + gocqlw "github.com/jaegertracing/jaeger/pkg/cassandra/gocql" "github.com/jaegertracing/jaeger/pkg/distributedlock" "github.com/jaegertracing/jaeger/pkg/hostname" "github.com/jaegertracing/jaeger/pkg/metrics" @@ -24,6 +25,7 @@ import ( cLock "github.com/jaegertracing/jaeger/plugin/pkg/distributedlock/cassandra" cDepStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/dependencystore" cSamplingStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/samplingstore" + "github.com/jaegertracing/jaeger/plugin/storage/cassandra/schema" cSpanStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore" "github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore/dbmodel" "github.com/jaegertracing/jaeger/storage" @@ -55,17 +57,22 @@ type Factory struct { logger *zap.Logger tracer trace.TracerProvider - primaryConfig config.SessionBuilder + primaryConfig config.Configuration + archiveConfig *config.Configuration + primarySession cassandra.Session - archiveConfig config.SessionBuilder archiveSession cassandra.Session + + // tests can override this + sessionBuilderFn func(*config.Configuration) (cassandra.Session, error) } // NewFactory creates a new Factory. func NewFactory() *Factory { return &Factory{ - tracer: otel.GetTracerProvider(), - Options: NewOptions(primaryStorageConfig, archiveStorageConfig), + tracer: otel.GetTracerProvider(), + Options: NewOptions(primaryStorageConfig, archiveStorageConfig), + sessionBuilderFn: NewSession, } } @@ -126,9 +133,7 @@ func (f *Factory) configureFromOptions(o *Options) { o.others = make(map[string]*NamespaceConfig) } f.primaryConfig = o.GetPrimary() - if cfg := f.Options.Get(archiveStorageConfig); cfg != nil { - f.archiveConfig = cfg // this is so stupid - see https://golang.org/doc/faq#nil_error - } + f.archiveConfig = f.Options.Get(archiveStorageConfig) } // Initialize implements storage.Factory @@ -137,14 +142,14 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) f.archiveMetricsFactory = metricsFactory.Namespace(metrics.NSOptions{Name: "cassandra-archive", Tags: nil}) f.logger = logger - primarySession, err := f.primaryConfig.NewSession() + primarySession, err := f.sessionBuilderFn(&f.primaryConfig) if err != nil { return err } f.primarySession = primarySession if f.archiveConfig != nil { - archiveSession, err := f.archiveConfig.NewSession() + archiveSession, err := f.sessionBuilderFn(f.archiveConfig) if err != nil { return err } @@ -155,6 +160,48 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) return nil } +// createSession creates session from a configuration +func createSession(c *config.Configuration) (cassandra.Session, error) { + cluster, err := c.NewCluster() + if err != nil { + return nil, err + } + + session, err := cluster.CreateSession() + if err != nil { + return nil, err + } + + return gocqlw.WrapCQLSession(session), nil +} + +// newSessionPrerequisites creates tables and types before creating a session +func newSessionPrerequisites(c *config.Configuration) error { + if !c.Schema.CreateSchema { + return nil + } + + cfg := *c // clone because we need to connect without specifying a keyspace + cfg.Schema.Keyspace = "" + + session, err := createSession(&cfg) + if err != nil { + return err + } + + sc := schema.NewSchemaCreator(session, c.Schema) + return sc.CreateSchemaIfNotPresent() +} + +// NewSession creates a new Cassandra session +func NewSession(c *config.Configuration) (cassandra.Session, error) { + if err := newSessionPrerequisites(c); err != nil { + return nil, err + } + + return createSession(c) +} + // CreateSpanReader implements storage.Factory func (f *Factory) CreateSpanReader() (spanstore.Reader, error) { return cSpanStore.NewSpanReader(f.primarySession, f.primaryMetricsFactory, f.logger, f.tracer.Tracer("cSpanStore.SpanReader")) diff --git a/plugin/storage/cassandra/factory_test.go b/plugin/storage/cassandra/factory_test.go index 9a2fcc486cf..d8800089a75 100644 --- a/plugin/storage/cassandra/factory_test.go +++ b/plugin/storage/cassandra/factory_test.go @@ -12,43 +12,48 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/config/configtls" "go.uber.org/zap" "github.com/jaegertracing/jaeger/pkg/cassandra" - cassandraCfg "github.com/jaegertracing/jaeger/pkg/cassandra/config" + "github.com/jaegertracing/jaeger/pkg/cassandra/config" "github.com/jaegertracing/jaeger/pkg/cassandra/mocks" - "github.com/jaegertracing/jaeger/pkg/config" + viperize "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/pkg/testutils" ) type mockSessionBuilder struct { - session *mocks.Session - err error + index int + sessions []*mocks.Session + errors []error } -func newMockSessionBuilder(session *mocks.Session, err error) *mockSessionBuilder { - return &mockSessionBuilder{ - session: session, - err: err, - } +func (m *mockSessionBuilder) add(session *mocks.Session, err error) *mockSessionBuilder { + m.sessions = append(m.sessions, session) + m.errors = append(m.errors, err) + return m } -func (m *mockSessionBuilder) NewSession() (cassandra.Session, error) { - return m.session, m.err +func (m *mockSessionBuilder) build(*config.Configuration) (cassandra.Session, error) { + if m.index >= len(m.sessions) { + return nil, errors.New("no more sessions") + } + session := m.sessions[m.index] + err := m.errors[m.index] + m.index++ + return session, err } func TestCassandraFactory(t *testing.T) { logger, logBuf := testutils.NewLogger() f := NewFactory() - v, command := config.Viperize(f.AddFlags) + v, command := viperize.Viperize(f.AddFlags) command.ParseFlags([]string{"--cassandra-archive.enabled=true"}) f.InitFromViper(v, zap.NewNop()) - // after InitFromViper, f.primaryConfig points to a real session builder that will fail in unit tests, - // so we override it with a mock. - f.primaryConfig = newMockSessionBuilder(nil, errors.New("made-up error")) - require.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up error") + f.sessionBuilderFn = new(mockSessionBuilder).add(nil, errors.New("made-up primary error")).build + require.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up primary error") var ( session = &mocks.Session{} @@ -57,11 +62,13 @@ func TestCassandraFactory(t *testing.T) { session.On("Query", mock.AnythingOfType("string"), mock.Anything).Return(query) session.On("Close").Return() query.On("Exec").Return(nil) - f.primaryConfig = newMockSessionBuilder(session, nil) - f.archiveConfig = newMockSessionBuilder(nil, errors.New("made-up error")) - require.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up error") + f.sessionBuilderFn = new(mockSessionBuilder). + add(session, nil). + add(nil, errors.New("made-up archive error")).build + require.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up archive error") f.archiveConfig = nil + f.sessionBuilderFn = new(mockSessionBuilder).add(session, nil).build require.NoError(t, f.Initialize(metrics.NullFactory, logger)) assert.Contains(t, logBuf.String(), "Cassandra archive storage configuration is empty, skipping") @@ -80,7 +87,8 @@ func TestCassandraFactory(t *testing.T) { _, err = f.CreateArchiveSpanWriter() require.EqualError(t, err, "archive storage not configured") - f.archiveConfig = newMockSessionBuilder(session, nil) + f.archiveConfig = &config.Configuration{} + f.sessionBuilderFn = new(mockSessionBuilder).add(session, nil).add(session, nil).build require.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) _, err = f.CreateArchiveSpanReader() @@ -99,9 +107,8 @@ func TestCassandraFactory(t *testing.T) { } func TestExclusiveWhitelistBlacklist(t *testing.T) { - logger, logBuf := testutils.NewLogger() f := NewFactory() - v, command := config.Viperize(f.AddFlags) + v, command := viperize.Viperize(f.AddFlags) command.ParseFlags([]string{ "--cassandra-archive.enabled=true", "--cassandra.index.tag-whitelist=a,b,c", @@ -109,29 +116,19 @@ func TestExclusiveWhitelistBlacklist(t *testing.T) { }) f.InitFromViper(v, zap.NewNop()) - // after InitFromViper, f.primaryConfig points to a real session builder that will fail in unit tests, - // so we override it with a mock. - f.primaryConfig = newMockSessionBuilder(nil, errors.New("made-up error")) - require.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up error") - var ( session = &mocks.Session{} query = &mocks.Query{} ) session.On("Query", mock.AnythingOfType("string"), mock.Anything).Return(query) query.On("Exec").Return(nil) - f.primaryConfig = newMockSessionBuilder(session, nil) - f.archiveConfig = newMockSessionBuilder(nil, errors.New("made-up error")) - require.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up error") - - f.archiveConfig = nil - require.NoError(t, f.Initialize(metrics.NullFactory, logger)) - assert.Contains(t, logBuf.String(), "Cassandra archive storage configuration is empty, skipping") + f.sessionBuilderFn = new(mockSessionBuilder).add(session, nil).build _, err := f.CreateSpanWriter() require.EqualError(t, err, "only one of TagIndexBlacklist and TagIndexWhitelist can be specified") - f.archiveConfig = &mockSessionBuilder{} + f.archiveConfig = &config.Configuration{} + f.sessionBuilderFn = new(mockSessionBuilder).add(session, nil).add(session, nil).build require.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) _, err = f.CreateArchiveSpanWriter() @@ -140,7 +137,7 @@ func TestExclusiveWhitelistBlacklist(t *testing.T) { func TestWriterOptions(t *testing.T) { opts := NewOptions("cassandra") - v, command := config.Viperize(opts.AddFlags) + v, command := viperize.Viperize(opts.AddFlags) command.ParseFlags([]string{"--cassandra.index.tag-whitelist=a,b,c"}) opts.InitFromViper(v) @@ -148,7 +145,7 @@ func TestWriterOptions(t *testing.T) { assert.Len(t, options, 1) opts = NewOptions("cassandra") - v, command = config.Viperize(opts.AddFlags) + v, command = viperize.Viperize(opts.AddFlags) command.ParseFlags([]string{"--cassandra.index.tag-blacklist=a,b,c"}) opts.InitFromViper(v) @@ -156,7 +153,7 @@ func TestWriterOptions(t *testing.T) { assert.Len(t, options, 1) opts = NewOptions("cassandra") - v, command = config.Viperize(opts.AddFlags) + v, command = viperize.Viperize(opts.AddFlags) command.ParseFlags([]string{"--cassandra.index.tags=false"}) opts.InitFromViper(v) @@ -164,7 +161,7 @@ func TestWriterOptions(t *testing.T) { assert.Len(t, options, 1) opts = NewOptions("cassandra") - v, command = config.Viperize(opts.AddFlags) + v, command = viperize.Viperize(opts.AddFlags) command.ParseFlags([]string{"--cassandra.index.tags=false", "--cassandra.index.tag-blacklist=a,b,c"}) opts.InitFromViper(v) @@ -172,7 +169,7 @@ func TestWriterOptions(t *testing.T) { assert.Len(t, options, 1) opts = NewOptions("cassandra") - v, command = config.Viperize(opts.AddFlags) + v, command = viperize.Viperize(opts.AddFlags) command.ParseFlags([]string{""}) opts.InitFromViper(v) @@ -194,11 +191,7 @@ func TestNewFactoryWithConfig(t *testing.T) { t.Run("valid configuration", func(t *testing.T) { opts := &Options{ Primary: NamespaceConfig{ - Configuration: cassandraCfg.Configuration{ - Connection: cassandraCfg.Connection{ - Servers: []string{"localhost:9200"}, - }, - }, + Configuration: config.DefaultConfiguration(), }, } f := NewFactory() @@ -216,11 +209,7 @@ func TestNewFactoryWithConfig(t *testing.T) { expErr := errors.New("made-up error") opts := &Options{ Primary: NamespaceConfig{ - Configuration: cassandraCfg.Configuration{ - Connection: cassandraCfg.Connection{ - Servers: []string{"localhost:9200"}, - }, - }, + Configuration: config.DefaultConfiguration(), }, } f := NewFactory() @@ -257,3 +246,33 @@ func TestFactory_Purge(t *testing.T) { session.AssertCalled(t, "Query", mock.AnythingOfType("string"), mock.Anything) query.AssertCalled(t, "Exec") } + +func TestNewSessionErrors(t *testing.T) { + t.Run("NewCluster error", func(t *testing.T) { + cfg := &config.Configuration{ + Connection: config.Connection{ + TLS: configtls.ClientConfig{ + Config: configtls.Config{ + CAFile: "foobar", + }, + }, + }, + } + _, err := NewSession(cfg) + require.ErrorContains(t, err, "failed to load TLS config") + }) + t.Run("CreateSession error", func(t *testing.T) { + cfg := &config.Configuration{} + _, err := NewSession(cfg) + require.ErrorContains(t, err, "no hosts provided") + }) + t.Run("CreateSession error with schema", func(t *testing.T) { + cfg := &config.Configuration{ + Schema: config.Schema{ + CreateSchema: true, + }, + } + _, err := NewSession(cfg) + require.ErrorContains(t, err, "no hosts provided") + }) +} diff --git a/plugin/storage/cassandra/options.go b/plugin/storage/cassandra/options.go index e266436ba29..505d99c1ef3 100644 --- a/plugin/storage/cassandra/options.go +++ b/plugin/storage/cassandra/options.go @@ -256,8 +256,8 @@ func (cfg *NamespaceConfig) initFromViper(v *viper.Viper) { } // GetPrimary returns primary configuration. -func (opt *Options) GetPrimary() *config.Configuration { - return &opt.Primary.Configuration +func (opt *Options) GetPrimary() config.Configuration { + return opt.Primary.Configuration } // Get returns auxiliary named configuration. diff --git a/plugin/storage/cassandra/savetracetest/main.go b/plugin/storage/cassandra/savetracetest/main.go index d2190f5511e..d6c2b1f3fa1 100644 --- a/plugin/storage/cassandra/savetracetest/main.go +++ b/plugin/storage/cassandra/savetracetest/main.go @@ -14,6 +14,7 @@ import ( cascfg "github.com/jaegertracing/jaeger/pkg/cassandra/config" "github.com/jaegertracing/jaeger/pkg/jtracer" "github.com/jaegertracing/jaeger/pkg/metrics" + "github.com/jaegertracing/jaeger/plugin/storage/cassandra" cSpanStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore" "github.com/jaegertracing/jaeger/storage/spanstore" ) @@ -36,7 +37,7 @@ func main() { Timeout: time.Millisecond * 750, }, } - cqlSession, err := cConfig.NewSession() + cqlSession, err := cassandra.NewSession(cConfig) if err != nil { logger.Fatal("Cannot create Cassandra session", zap.Error(err)) } diff --git a/plugin/storage/cassandra/schema/package_test.go b/plugin/storage/cassandra/schema/package_test.go new file mode 100644 index 00000000000..5c3adb21543 --- /dev/null +++ b/plugin/storage/cassandra/schema/package_test.go @@ -0,0 +1,14 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package schema + +import ( + "testing" + + "github.com/jaegertracing/jaeger/pkg/testutils" +) + +func TestMain(m *testing.M) { + testutils.VerifyGoLeaks(m) +} diff --git a/plugin/storage/cassandra/schema/schema.go b/plugin/storage/cassandra/schema/schema.go new file mode 100644 index 00000000000..25aa172355f --- /dev/null +++ b/plugin/storage/cassandra/schema/schema.go @@ -0,0 +1,154 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package schema + +import ( + "bytes" + "embed" + "errors" + "fmt" + "text/template" + "time" + + "github.com/jaegertracing/jaeger/pkg/cassandra" + "github.com/jaegertracing/jaeger/pkg/cassandra/config" +) + +//go:embed v004-go-tmpl.cql.tmpl +var schemaFile embed.FS + +type templateParams struct { + // Keyspace in which tables and types will be created for storage + Keyspace string + // Replication is the replication strategy used. Ex: "{'class': 'NetworkTopologyStrategy', 'replication_factor': '1' }" + Replication string + // CompactionWindowInMinutes is constructed from CompactionWindow for using in template + CompactionWindowInMinutes int64 + // TraceTTLInSeconds is constructed from TraceTTL for using in template + TraceTTLInSeconds int64 + // DependenciesTTLInSeconds is constructed from DependenciesTTL for using in template + DependenciesTTLInSeconds int64 +} + +type Creator struct { + session cassandra.Session + schema config.Schema +} + +// NewSchemaCreator returns a new SchemaCreator +func NewSchemaCreator(session cassandra.Session, schema config.Schema) *Creator { + return &Creator{ + session: session, + schema: schema, + } +} + +func (sc *Creator) constructTemplateParams() templateParams { + return templateParams{ + Keyspace: sc.schema.Keyspace, + Replication: fmt.Sprintf("{'class': 'NetworkTopologyStrategy', 'replication_factor': '%v' }", sc.schema.ReplicationFactor), + CompactionWindowInMinutes: int64(sc.schema.CompactionWindow / time.Minute), + TraceTTLInSeconds: int64(sc.schema.TraceTTL / time.Second), + DependenciesTTLInSeconds: int64(sc.schema.DependenciesTTL / time.Second), + } +} + +func (*Creator) getQueryFileAsBytes(fileName string, params templateParams) ([]byte, error) { + tmpl, err := template.ParseFS(schemaFile, fileName) + if err != nil { + return nil, err + } + + var result bytes.Buffer + err = tmpl.Execute(&result, params) + if err != nil { + return nil, err + } + + return result.Bytes(), nil +} + +func (*Creator) getQueriesFromBytes(queryFile []byte) ([]string, error) { + lines := bytes.Split(queryFile, []byte("\n")) + + var extractedLines [][]byte + + for _, line := range lines { + // Remove any comments, if at the end of the line + commentIndex := bytes.Index(line, []byte(`--`)) + if commentIndex != -1 { + // remove everything after comment + line = line[0:commentIndex] + } + + trimmedLine := bytes.TrimSpace(line) + + if len(trimmedLine) == 0 { + continue + } + + extractedLines = append(extractedLines, trimmedLine) + } + + var queries []string + + // Construct individual queries strings + var queryString string + for _, line := range extractedLines { + queryString += string(line) + "\n" + if bytes.HasSuffix(line, []byte(";")) { + queries = append(queries, queryString) + queryString = "" + } + } + + if len(queryString) > 0 { + return nil, errors.New(`query exists in template without ";"`) + } + + return queries, nil +} + +func (sc *Creator) getCassandraQueriesFromQueryStrings(queries []string) []cassandra.Query { + var casQueries []cassandra.Query + + for _, query := range queries { + casQueries = append(casQueries, sc.session.Query(query)) + } + + return casQueries +} + +func (sc *Creator) contructSchemaQueries() ([]cassandra.Query, error) { + params := sc.constructTemplateParams() + + queryFile, err := sc.getQueryFileAsBytes(`v004-go-tmpl.cql.tmpl`, params) + if err != nil { + return nil, err + } + + queryStrings, err := sc.getQueriesFromBytes(queryFile) + if err != nil { + return nil, err + } + + casQueries := sc.getCassandraQueriesFromQueryStrings(queryStrings) + + return casQueries, nil +} + +func (sc *Creator) CreateSchemaIfNotPresent() error { + casQueries, err := sc.contructSchemaQueries() + if err != nil { + return err + } + + for _, query := range casQueries { + if err := query.Exec(); err != nil { + return err + } + } + + return nil +} diff --git a/plugin/storage/cassandra/schema/schema_test.go b/plugin/storage/cassandra/schema/schema_test.go new file mode 100644 index 00000000000..39e69b89916 --- /dev/null +++ b/plugin/storage/cassandra/schema/schema_test.go @@ -0,0 +1,59 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package schema + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestQueryGenerationFromBytes(t *testing.T) { + queriesAsString := ` +query1 -- comment (this should be removed) +query1-continue +query1-finished; -- + + +query2; +query-3 query-3-continue query-3-finished; +` + expGeneratedQueries := []string{ + `query1 +query1-continue +query1-finished; +`, + `query2; +`, + `query-3 query-3-continue query-3-finished; +`, + } + + sc := Creator{} + queriesAsBytes := []byte(queriesAsString) + queries, err := sc.getQueriesFromBytes(queriesAsBytes) + require.NoError(t, err) + + require.Equal(t, len(expGeneratedQueries), len(queries)) + + for i := range len(expGeneratedQueries) { + require.Equal(t, expGeneratedQueries[i], queries[i]) + } +} + +func TestInvalidQueryTemplate(t *testing.T) { + queriesAsString := ` + query1 -- comment (this should be removed) + query1-continue + query1-finished; -- + + + query2; + query-3 query-3-continue query-3-finished -- missing semicolon + ` + sc := Creator{} + queriesAsBytes := []byte(queriesAsString) + _, err := sc.getQueriesFromBytes(queriesAsBytes) + require.Error(t, err) +} diff --git a/plugin/storage/cassandra/schema/v004-go-tmpl.cql.tmpl b/plugin/storage/cassandra/schema/v004-go-tmpl.cql.tmpl new file mode 100644 index 00000000000..41ea7494274 --- /dev/null +++ b/plugin/storage/cassandra/schema/v004-go-tmpl.cql.tmpl @@ -0,0 +1,200 @@ +CREATE KEYSPACE IF NOT EXISTS {{.Keyspace}} WITH replication = {{.Replication}}; + +CREATE TYPE IF NOT EXISTS {{.Keyspace}}.keyvalue ( + key text, + value_type text, + value_string text, + value_bool boolean, + value_long bigint, + value_double double, + value_binary blob +); + +CREATE TYPE IF NOT EXISTS {{.Keyspace}}.log ( + ts bigint, -- microseconds since epoch + fields frozen>> +); + +CREATE TYPE IF NOT EXISTS {{.Keyspace}}.span_ref ( + ref_type text, + trace_id blob, + span_id bigint +); + +CREATE TYPE IF NOT EXISTS {{.Keyspace}}.process ( + service_name text, + tags frozen>> +); + +-- Notice we have span_hash. This exists only for zipkin backwards compat. Zipkin allows spans with the same ID. +-- Note: Cassandra re-orders non-PK columns alphabetically, so the table looks differently in CQLSH "describe table". +-- start_time is bigint instead of timestamp as we require microsecond precision +CREATE TABLE IF NOT EXISTS {{.Keyspace}}.traces ( + trace_id blob, + span_id bigint, + span_hash bigint, + parent_id bigint, + operation_name text, + flags int, + start_time bigint, -- microseconds since epoch + duration bigint, -- microseconds + tags list>, + logs list>, + refs list>, + process frozen, + PRIMARY KEY (trace_id, span_id, span_hash) +) + WITH compaction = { + 'compaction_window_size': '{{.CompactionWindowInMinutes}}', + 'compaction_window_unit': 'MINUTES', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND default_time_to_live = {{.TraceTTLInSeconds}} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TABLE IF NOT EXISTS {{.Keyspace}}.service_names ( + service_name text, + PRIMARY KEY (service_name) +) + WITH compaction = { + 'min_threshold': '4', + 'max_threshold': '32', + 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' + } + AND default_time_to_live = {{.TraceTTLInSeconds}} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TABLE IF NOT EXISTS {{.Keyspace}}.operation_names_v2 ( + service_name text, + span_kind text, + operation_name text, + PRIMARY KEY ((service_name), span_kind, operation_name) +) + WITH compaction = { + 'min_threshold': '4', + 'max_threshold': '32', + 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' + } + AND default_time_to_live = {{.TraceTTLInSeconds}} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +-- index of trace IDs by service + operation names, sorted by span start_time. +CREATE TABLE IF NOT EXISTS {{.Keyspace}}.service_operation_index ( + service_name text, + operation_name text, + start_time bigint, -- microseconds since epoch + trace_id blob, + PRIMARY KEY ((service_name, operation_name), start_time) +) WITH CLUSTERING ORDER BY (start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND default_time_to_live = {{.TraceTTLInSeconds}} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TABLE IF NOT EXISTS {{.Keyspace}}.service_name_index ( + service_name text, + bucket int, + start_time bigint, -- microseconds since epoch + trace_id blob, + PRIMARY KEY ((service_name, bucket), start_time) +) WITH CLUSTERING ORDER BY (start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND default_time_to_live = {{.TraceTTLInSeconds}} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TABLE IF NOT EXISTS {{.Keyspace}}.duration_index ( + service_name text, -- service name + operation_name text, -- operation name, or blank for queries without span name + bucket timestamp, -- time bucket, - the start_time of the given span rounded to an hour + duration bigint, -- span duration, in microseconds + start_time bigint, -- microseconds since epoch + trace_id blob, + PRIMARY KEY ((service_name, operation_name, bucket), duration, start_time, trace_id) +) WITH CLUSTERING ORDER BY (duration DESC, start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND default_time_to_live = {{.TraceTTLInSeconds}} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +-- a bucketing strategy may have to be added for tag queries +-- we can make this table even better by adding a timestamp to it +CREATE TABLE IF NOT EXISTS {{.Keyspace}}.tag_index ( + service_name text, + tag_key text, + tag_value text, + start_time bigint, -- microseconds since epoch + trace_id blob, + span_id bigint, + PRIMARY KEY ((service_name, tag_key, tag_value), start_time, trace_id, span_id) +) + WITH CLUSTERING ORDER BY (start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND default_time_to_live = {{.TraceTTLInSeconds}} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TYPE IF NOT EXISTS {{.Keyspace}}.dependency ( + parent text, + child text, + call_count bigint, + source text +); + +-- compaction strategy is intentionally different as compared to other tables due to the size of dependencies data +CREATE TABLE IF NOT EXISTS {{.Keyspace}}.dependencies_v2 ( + ts_bucket timestamp, + ts timestamp, + dependencies list>, + PRIMARY KEY (ts_bucket, ts) +) WITH CLUSTERING ORDER BY (ts DESC) + AND compaction = { + 'min_threshold': '4', + 'max_threshold': '32', + 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' + } + AND default_time_to_live = {{.DependenciesTTLInSeconds}}; + +-- adaptive sampling tables +-- ./plugin/storage/cassandra/samplingstore/storage.go +CREATE TABLE IF NOT EXISTS {{.Keyspace}}.operation_throughput ( + bucket int, + ts timeuuid, + throughput text, + PRIMARY KEY(bucket, ts) +) WITH CLUSTERING ORDER BY (ts desc); + +CREATE TABLE IF NOT EXISTS {{.Keyspace}}.sampling_probabilities ( + bucket int, + ts timeuuid, + hostname text, + probabilities text, + PRIMARY KEY(bucket, ts) +) WITH CLUSTERING ORDER BY (ts desc); + +-- distributed lock +-- ./plugin/pkg/distributedlock/cassandra/lock.go +CREATE TABLE IF NOT EXISTS {{.Keyspace}}.leases ( + name text, + owner text, + PRIMARY KEY (name) +); \ No newline at end of file diff --git a/scripts/cassandra-integration-test.sh b/scripts/cassandra-integration-test.sh index 9f5e295bc3c..3789017d419 100755 --- a/scripts/cassandra-integration-test.sh +++ b/scripts/cassandra-integration-test.sh @@ -11,6 +11,9 @@ success="false" timeout=600 end_time=$((SECONDS + timeout)) +SKIP_APPLY_SCHEMA=${SKIP_APPLY_SCHEMA:-"false"} +export CASSANDRA_CREATE_SCHEMA=${SKIP_APPLY_SCHEMA} + usage() { echo $"Usage: $0 " exit 1 @@ -98,8 +101,10 @@ run_integration_test() { healthcheck_cassandra "${major_version}" - apply_schema "$schema_version" "$primaryKeyspace" - apply_schema "$schema_version" "$archiveKeyspace" + if [ "${SKIP_APPLY_SCHEMA}" = "false" ]; then + apply_schema "$schema_version" "$primaryKeyspace" + apply_schema "$schema_version" "$archiveKeyspace" + fi if [ "${jaegerVersion}" = "v1" ]; then STORAGE=cassandra make storage-integration-test