Skip to content

Commit

Permalink
Create Cassandra db schema on session initialization (#5922)
Browse files Browse the repository at this point in the history
Create Schema (if not present) on Session Initialization

Once a session is established with cassandra db, the added code parses
the template file containing queries for creating schema and create
queries out of it. Post which it executes those queries to create the
required types and tables.

## Which problem is this PR solving?
Resolves #5797 

## Description of the changes
- The PR includes the following changes:
- 1. Embedding template files into binary
- 2. Creation of database schema in initialization steps once session to
database is established.

## How was this change tested?
-  Schema rendering is being tested with unit test.
-  bash scripts/cassandra-integration-test.sh -s 4 v004 v2

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [x] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `yarn lint` and `yarn test`

---------

Signed-off-by: Alok Kumar Singh <[email protected]>
Signed-off-by: Alok Kumar Singh <[email protected]>
Signed-off-by: Yuri Shkuro <[email protected]>
Signed-off-by: Yuri Shkuro <[email protected]>
Co-authored-by: Yuri Shkuro <[email protected]>
Co-authored-by: Yuri Shkuro <[email protected]>
  • Loading branch information
3 people authored Nov 30, 2024
1 parent 2e5ed05 commit e9fac05
Show file tree
Hide file tree
Showing 13 changed files with 662 additions and 90 deletions.
11 changes: 9 additions & 2 deletions .github/workflows/ci-e2e-cassandra.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,19 @@ jobs:
fail-fast: false
matrix:
jaeger-version: [v1, v2]
create-schema: [manual, auto]
version:
- distribution: cassandra
major: 4.x
schema: v004
- distribution: cassandra
major: 5.x
schema: v004
name: ${{ matrix.version.distribution }} ${{ matrix.version.major }} ${{ matrix.jaeger-version }}
exclude:
# Exclude v1 as create schema on fly is available for v2 only
- jaeger-version: v1
create-schema: auto
name: ${{ matrix.version.distribution }}-${{ matrix.version.major }} ${{ matrix.jaeger-version }} schema=${{ matrix.create-schema }}
steps:
- name: Harden Runner
uses: step-security/harden-runner@91182cccc01eb5e619899d80e4e971d6181294a7 # v2.10.1
Expand All @@ -45,9 +50,11 @@ jobs:
- name: Run cassandra integration tests
id: test-execution
run: bash scripts/cassandra-integration-test.sh ${{ matrix.version.major }} ${{ matrix.version.schema }} ${{ matrix.jaeger-version }}
env:
SKIP_APPLY_SCHEMA: ${{ matrix.create-schema == 'auto' && true || false }}

- name: Upload coverage to codecov
uses: ./.github/actions/upload-codecov
with:
files: cover.out
flags: cassandra-${{ matrix.version.major }}-${{ matrix.jaeger-version }}
flags: cassandra-${{ matrix.version.major }}-${{ matrix.jaeger-version }}-${{ matrix.create-schema }}
4 changes: 3 additions & 1 deletion cmd/jaeger/config-cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ extensions:
cassandra:
schema:
keyspace: "jaeger_v1_dc1"
create: "${env:CASSANDRA_CREATE_SCHEMA:-true}"
connection:
auth:
basic:
Expand All @@ -44,7 +45,8 @@ extensions:
another_storage:
cassandra:
schema:
keyspace: "jaeger_v1_dc1"
keyspace: "jaeger_v1_dc1_archive"
create: "${env:CASSANDRA_CREATE_SCHEMA:-true}"
connection:
auth:
basic:
Expand Down
86 changes: 63 additions & 23 deletions pkg/cassandra/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,13 @@ package config

import (
"context"
"errors"
"fmt"
"time"

"github.com/asaskevich/govalidator"
"github.com/gocql/gocql"
"go.opentelemetry.io/collector/config/configtls"

"github.com/jaegertracing/jaeger/pkg/cassandra"
gocqlw "github.com/jaegertracing/jaeger/pkg/cassandra/gocql"
)

// Configuration describes the configuration properties needed to connect to a Cassandra cluster.
Expand Down Expand Up @@ -58,6 +56,19 @@ type Schema struct {
// while connecting to the Cassandra Cluster. This is useful for connecting to clusters, like Azure Cosmos DB,
// that do not support SnappyCompression.
DisableCompression bool `mapstructure:"disable_compression"`
// CreateSchema tells if the schema ahould be created during session initialization based on the configs provided
CreateSchema bool `mapstructure:"create" valid:"optional"`
// Datacenter is the name for network topology
Datacenter string `mapstructure:"datacenter" valid:"optional"`
// TraceTTL is Time To Live (TTL) for the trace data. Should at least be 1 second
TraceTTL time.Duration `mapstructure:"trace_ttl" valid:"optional"`
// DependenciesTTL is Time To Live (TTL) for dependencies data. Should at least be 1 second
DependenciesTTL time.Duration `mapstructure:"dependencies_ttl" valid:"optional"`
// Replication factor for the db
ReplicationFactor int `mapstructure:"replication_factor" valid:"optional"`
// CompactionWindow is the size of the window for TimeWindowCompactionStrategy.
// All SSTables within that window are grouped together into one SSTable.
CompactionWindow time.Duration `mapstructure:"compaction_window" valid:"optional"`
}

type Query struct {
Expand Down Expand Up @@ -86,7 +97,13 @@ type BasicAuthenticator struct {
func DefaultConfiguration() Configuration {
return Configuration{
Schema: Schema{
Keyspace: "jaeger_v1_test",
CreateSchema: false,
Keyspace: "jaeger_dc1",
Datacenter: "dc1",
TraceTTL: 2 * 24 * time.Hour,
DependenciesTTL: 2 * 24 * time.Hour,
ReplicationFactor: 1,
CompactionWindow: time.Minute,
},
Connection: Connection{
Servers: []string{"127.0.0.1"},
Expand All @@ -106,6 +123,27 @@ func (c *Configuration) ApplyDefaults(source *Configuration) {
if c.Schema.Keyspace == "" {
c.Schema.Keyspace = source.Schema.Keyspace
}

if c.Schema.Datacenter == "" {
c.Schema.Datacenter = source.Schema.Datacenter
}

if c.Schema.TraceTTL == 0 {
c.Schema.TraceTTL = source.Schema.TraceTTL
}

if c.Schema.DependenciesTTL == 0 {
c.Schema.DependenciesTTL = source.Schema.DependenciesTTL
}

if c.Schema.ReplicationFactor == 0 {
c.Schema.ReplicationFactor = source.Schema.ReplicationFactor
}

if c.Schema.CompactionWindow == 0 {
c.Schema.CompactionWindow = source.Schema.CompactionWindow
}

if c.Connection.ConnectionsPerHost == 0 {
c.Connection.ConnectionsPerHost = source.Connection.ConnectionsPerHost
}
Expand All @@ -129,24 +167,6 @@ func (c *Configuration) ApplyDefaults(source *Configuration) {
}
}

// SessionBuilder creates new cassandra.Session
type SessionBuilder interface {
NewSession() (cassandra.Session, error)
}

// NewSession creates a new Cassandra session
func (c *Configuration) NewSession() (cassandra.Session, error) {
cluster, err := c.NewCluster()
if err != nil {
return nil, err
}
session, err := cluster.CreateSession()
if err != nil {
return nil, err
}
return gocqlw.WrapCQLSession(session), nil
}

// NewCluster creates a new gocql cluster from the configuration
func (c *Configuration) NewCluster() (*gocql.ClusterConfig, error) {
cluster := gocql.NewCluster(c.Connection.Servers...)
Expand Down Expand Up @@ -210,7 +230,27 @@ func (c *Configuration) String() string {
return fmt.Sprintf("%+v", *c)
}

func isValidTTL(duration time.Duration) bool {
return duration == 0 || duration >= time.Second
}

func (c *Configuration) Validate() error {
_, err := govalidator.ValidateStruct(c)
return err
if err != nil {
return err
}

if !isValidTTL(c.Schema.TraceTTL) {
return errors.New("trace_ttl can either be 0 or greater than or equal to 1 second")
}

if !isValidTTL(c.Schema.DependenciesTTL) {
return errors.New("dependencies_ttl can either be 0 or greater than or equal to 1 second")
}

if c.Schema.CompactionWindow < time.Minute {
return errors.New("compaction_window should at least be 1 minute")
}

return nil
}
24 changes: 24 additions & 0 deletions pkg/cassandra/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package config

import (
"testing"
"time"

"github.com/gocql/gocql"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -43,6 +44,9 @@ func TestValidate_DoesNotReturnErrorWhenRequiredFieldsSet(t *testing.T) {
Connection: Connection{
Servers: []string{"localhost:9200"},
},
Schema: Schema{
CompactionWindow: time.Minute,
},
}

err := cfg.Validate()
Expand Down Expand Up @@ -94,3 +98,23 @@ func TestToString(t *testing.T) {
s := cfg.String()
assert.Contains(t, s, "Keyspace:test")
}

func TestConfigSchemaValidation(t *testing.T) {
cfg := DefaultConfiguration()
err := cfg.Validate()
require.NoError(t, err)

cfg.Schema.TraceTTL = time.Millisecond
err = cfg.Validate()
require.Error(t, err)

cfg.Schema.TraceTTL = time.Second
cfg.Schema.CompactionWindow = time.Minute - 1
err = cfg.Validate()
require.Error(t, err)

cfg.Schema.CompactionWindow = time.Minute
cfg.Schema.DependenciesTTL = time.Second - 1
err = cfg.Validate()
require.Error(t, err)
}
65 changes: 56 additions & 9 deletions plugin/storage/cassandra/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ import (

"github.com/jaegertracing/jaeger/pkg/cassandra"
"github.com/jaegertracing/jaeger/pkg/cassandra/config"
gocqlw "github.com/jaegertracing/jaeger/pkg/cassandra/gocql"
"github.com/jaegertracing/jaeger/pkg/distributedlock"
"github.com/jaegertracing/jaeger/pkg/hostname"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin"
cLock "github.com/jaegertracing/jaeger/plugin/pkg/distributedlock/cassandra"
cDepStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/dependencystore"
cSamplingStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/samplingstore"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra/schema"
cSpanStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore/dbmodel"
"github.com/jaegertracing/jaeger/storage"
Expand Down Expand Up @@ -55,17 +57,22 @@ type Factory struct {
logger *zap.Logger
tracer trace.TracerProvider

primaryConfig config.SessionBuilder
primaryConfig config.Configuration
archiveConfig *config.Configuration

primarySession cassandra.Session
archiveConfig config.SessionBuilder
archiveSession cassandra.Session

// tests can override this
sessionBuilderFn func(*config.Configuration) (cassandra.Session, error)
}

// NewFactory creates a new Factory.
func NewFactory() *Factory {
return &Factory{
tracer: otel.GetTracerProvider(),
Options: NewOptions(primaryStorageConfig, archiveStorageConfig),
tracer: otel.GetTracerProvider(),
Options: NewOptions(primaryStorageConfig, archiveStorageConfig),
sessionBuilderFn: NewSession,
}
}

Expand Down Expand Up @@ -126,9 +133,7 @@ func (f *Factory) configureFromOptions(o *Options) {
o.others = make(map[string]*NamespaceConfig)
}
f.primaryConfig = o.GetPrimary()
if cfg := f.Options.Get(archiveStorageConfig); cfg != nil {
f.archiveConfig = cfg // this is so stupid - see https://golang.org/doc/faq#nil_error
}
f.archiveConfig = f.Options.Get(archiveStorageConfig)
}

// Initialize implements storage.Factory
Expand All @@ -137,14 +142,14 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
f.archiveMetricsFactory = metricsFactory.Namespace(metrics.NSOptions{Name: "cassandra-archive", Tags: nil})
f.logger = logger

primarySession, err := f.primaryConfig.NewSession()
primarySession, err := f.sessionBuilderFn(&f.primaryConfig)
if err != nil {
return err
}
f.primarySession = primarySession

if f.archiveConfig != nil {
archiveSession, err := f.archiveConfig.NewSession()
archiveSession, err := f.sessionBuilderFn(f.archiveConfig)
if err != nil {
return err
}
Expand All @@ -155,6 +160,48 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
return nil
}

// createSession creates session from a configuration
func createSession(c *config.Configuration) (cassandra.Session, error) {
cluster, err := c.NewCluster()
if err != nil {
return nil, err
}

session, err := cluster.CreateSession()
if err != nil {
return nil, err
}

return gocqlw.WrapCQLSession(session), nil
}

// newSessionPrerequisites creates tables and types before creating a session
func newSessionPrerequisites(c *config.Configuration) error {
if !c.Schema.CreateSchema {
return nil
}

cfg := *c // clone because we need to connect without specifying a keyspace
cfg.Schema.Keyspace = ""

session, err := createSession(&cfg)
if err != nil {
return err
}

sc := schema.NewSchemaCreator(session, c.Schema)
return sc.CreateSchemaIfNotPresent()
}

// NewSession creates a new Cassandra session
func NewSession(c *config.Configuration) (cassandra.Session, error) {
if err := newSessionPrerequisites(c); err != nil {
return nil, err
}

return createSession(c)
}

// CreateSpanReader implements storage.Factory
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
return cSpanStore.NewSpanReader(f.primarySession, f.primaryMetricsFactory, f.logger, f.tracer.Tracer("cSpanStore.SpanReader"))
Expand Down
Loading

0 comments on commit e9fac05

Please sign in to comment.