Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create Cassandra db schema on session initialization #5922

Merged
Merged
Show file tree
Hide file tree
Changes from 45 commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
5041f31
Embeded template file in binary and added query construction and exec…
akstron Oct 28, 2024
30db170
Removed unnecessary SchemaConfig struct
akstron Oct 28, 2024
ce0c375
Added new schema configs in default config generator
akstron Oct 28, 2024
810ab1c
Revert Keyspace removal
akstron Oct 28, 2024
0d6383f
Bug fix while creating queries
akstron Oct 29, 2024
207945f
Improving test
akstron Oct 29, 2024
985f65b
Created new struct for derived params
akstron Oct 29, 2024
1c30503
Remove fields from yaml file
akstron Oct 29, 2024
e4ab709
Added integration test
akstron Nov 19, 2024
c329bba
Rebase fixes
akstron Nov 19, 2024
e3c6045
Minor changes in integration script
akstron Nov 19, 2024
492e15e
removed test
akstron Nov 19, 2024
44c39dc
Updated fields with time.Duration type and added validators and tests
akstron Nov 20, 2024
dfc0c43
minor changes in script
akstron Nov 20, 2024
cb8ae19
Addressed comments
akstron Nov 20, 2024
c3d0fbd
Addressed comments
akstron Nov 21, 2024
728a139
Update pkg/cassandra/config/schema.go
akstron Nov 21, 2024
1b6683d
Update pkg/cassandra/config/config.go
akstron Nov 21, 2024
ce11cc1
Addressed comments
akstron Nov 21, 2024
de1c563
Removed unused CasVersion
akstron Nov 21, 2024
edabe22
Addressed validation comments
akstron Nov 22, 2024
d0e1976
Created helper function for session created and updated tests
akstron Nov 26, 2024
d8479b5
Added schema unit tests
akstron Nov 26, 2024
02b6159
Update pkg/cassandra/config/config.go
akstron Nov 26, 2024
73d276a
Update pkg/cassandra/config/config.go
akstron Nov 26, 2024
57349a8
Update pkg/cassandra/config/config.go
akstron Nov 26, 2024
84b52e1
Fixed build
akstron Nov 26, 2024
9c2f05b
formatting fixes
akstron Nov 26, 2024
2c8de88
test fix
akstron Nov 27, 2024
eeb1951
Added test in workflow
akstron Nov 28, 2024
601365d
fmt fixes
akstron Nov 28, 2024
4aeaad7
create schema bug fix
akstron Nov 28, 2024
0167cb6
exclude v1 run with skip-apply-schema as true
akstron Nov 28, 2024
07b99da
Added schemaCreator and comments in workflow
akstron Nov 28, 2024
04ec76f
ci changes
akstron Nov 28, 2024
7246494
made template params private
akstron Nov 28, 2024
d8613f1
workflow fix
akstron Nov 28, 2024
a7853ec
Changed env variable name
akstron Nov 29, 2024
10bd9aa
lint fixes
akstron Nov 29, 2024
a682db2
lint fix
akstron Nov 29, 2024
35c26b5
test fix
akstron Nov 29, 2024
ddf4fc0
Workflow and test minor changes
akstron Nov 29, 2024
bbf3ac8
test fix
akstron Nov 30, 2024
a8e3aae
workflow changes
akstron Nov 30, 2024
a46bf37
Merge branch 'main' into create-database-scheme-cassandra
akstron Nov 30, 2024
c2b89e0
Apply suggestions from code review
yurishkuro Nov 30, 2024
de2d2ef
Update docs
yurishkuro Nov 30, 2024
100208c
refactor
yurishkuro Nov 30, 2024
37d34dd
clean-up imports
yurishkuro Nov 30, 2024
8d65894
clean-up
yurishkuro Nov 30, 2024
23a9b62
simplify
yurishkuro Nov 30, 2024
d925074
fix
yurishkuro Nov 30, 2024
154deb9
Fix workflow
yurishkuro Nov 30, 2024
a238455
Merge branch 'main' into create-database-scheme-cassandra
yurishkuro Nov 30, 2024
cc9db9c
rename
yurishkuro Nov 30, 2024
9d8ba06
Fix script
yurishkuro Nov 30, 2024
b0ac38a
fix naming for code coverage
yurishkuro Nov 30, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion .github/workflows/ci-e2e-cassandra.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,19 @@ jobs:
fail-fast: false
matrix:
jaeger-version: [v1, v2]
create-schema: [manual, auto]
version:
- distribution: cassandra
major: 4.x
schema: v004
- distribution: cassandra
major: 5.x
schema: v004
name: ${{ matrix.version.distribution }} ${{ matrix.version.major }} ${{ matrix.jaeger-version }}
exclude:
# Exclude v1 as create schema on fly is available for v2 only
- jaeger-version: v1
skip-apply-schema: true
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
name: ${{ matrix.version.distribution }} ${{ matrix.version.major }} ${{ matrix.jaeger-version }} ${{ matrix.create-schema }}
steps:
- name: Harden Runner
uses: step-security/harden-runner@91182cccc01eb5e619899d80e4e971d6181294a7 # v2.10.1
Expand All @@ -45,6 +50,8 @@ jobs:
- name: Run cassandra integration tests
id: test-execution
run: bash scripts/cassandra-integration-test.sh ${{ matrix.version.major }} ${{ matrix.version.schema }} ${{ matrix.jaeger-version }}
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
env:
SKIP_APPLY_SCHEMA: ${{ matrix.create-schema == 'auto' && true || false }}

- name: Upload coverage to codecov
uses: ./.github/actions/upload-codecov
Expand Down
4 changes: 3 additions & 1 deletion cmd/jaeger/config-cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ extensions:
cassandra:
schema:
keyspace: "jaeger_v1_dc1"
create: "${env:CASSANDRA_CREATE_SCHEMA:-false}"
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
connection:
auth:
basic:
Expand All @@ -44,7 +45,8 @@ extensions:
another_storage:
cassandra:
schema:
keyspace: "jaeger_v1_dc1"
keyspace: "jaeger_v1_dc1_archive"
create: "${env:CASSANDRA_CREATE_SCHEMA:-false}"
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
connection:
auth:
basic:
Expand Down
1 change: 1 addition & 0 deletions cmd/jaeger/internal/integration/cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

func TestCassandraStorage(t *testing.T) {
integration.SkipUnlessEnv(t, "cassandra")

yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
s := &E2EStorageIntegration{
ConfigFile: "../../config-cassandra.yaml",
StorageIntegration: integration.StorageIntegration{
Expand Down
101 changes: 97 additions & 4 deletions pkg/cassandra/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package config

import (
"context"
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -58,6 +59,18 @@ type Schema struct {
// while connecting to the Cassandra Cluster. This is useful for connecting to clusters, like Azure Cosmos DB,
// that do not support SnappyCompression.
DisableCompression bool `mapstructure:"disable_compression"`
// CreateSchema tells if the schema ahould be created during session initialization based on the configs provided
CreateSchema bool `mapstructure:"create" valid:"optional"`
// Datacenter is the name for network topology
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
Datacenter string `mapstructure:"datacenter" valid:"optional"`
// TraceTTL is Time To Live (TTL) for the trace data. Should at least be 1 second
TraceTTL time.Duration `mapstructure:"trace_ttl" valid:"optional"`
// DependenciesTTL is Time To Live (TTL) for dependencies data. Should at least be 1 second
DependenciesTTL time.Duration `mapstructure:"dependencies_ttl" valid:"optional"`
// Replication factor for the db
ReplicationFactor int `mapstructure:"replication_factor" valid:"optional"`
// CompactionWindow of format tells the compaction window of the db. Should atleast be 1 minute
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
CompactionWindow time.Duration `mapstructure:"compaction_window" valid:"optional"`
}

type Query struct {
Expand Down Expand Up @@ -86,7 +99,13 @@ type BasicAuthenticator struct {
func DefaultConfiguration() Configuration {
return Configuration{
Schema: Schema{
Keyspace: "jaeger_v1_test",
CreateSchema: false,
Keyspace: "jaeger_dc1",
Datacenter: "dc1",
TraceTTL: 2 * 24 * time.Hour,
DependenciesTTL: 2 * 24 * time.Hour,
ReplicationFactor: 1,
CompactionWindow: time.Minute,
},
Connection: Connection{
Servers: []string{"127.0.0.1"},
Expand All @@ -106,6 +125,27 @@ func (c *Configuration) ApplyDefaults(source *Configuration) {
if c.Schema.Keyspace == "" {
c.Schema.Keyspace = source.Schema.Keyspace
}

if c.Schema.Datacenter == "" {
c.Schema.Datacenter = source.Schema.Datacenter
}

if c.Schema.TraceTTL == 0 {
c.Schema.TraceTTL = source.Schema.TraceTTL
}

if c.Schema.DependenciesTTL == 0 {
c.Schema.DependenciesTTL = source.Schema.DependenciesTTL
}

if c.Schema.ReplicationFactor == 0 {
c.Schema.ReplicationFactor = source.Schema.ReplicationFactor
}

if c.Schema.CompactionWindow == 0 {
c.Schema.CompactionWindow = source.Schema.CompactionWindow
}

if c.Connection.ConnectionsPerHost == 0 {
c.Connection.ConnectionsPerHost = source.Connection.ConnectionsPerHost
}
Expand Down Expand Up @@ -134,19 +174,52 @@ type SessionBuilder interface {
NewSession() (cassandra.Session, error)
}

// NewSession creates a new Cassandra session
func (c *Configuration) NewSession() (cassandra.Session, error) {
// createSession creates session from a configuration
func createSession(c *Configuration) (cassandra.Session, error) {
cluster, err := c.NewCluster()
akstron marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}

session, err := cluster.CreateSession()
if err != nil {
return nil, err
}

return gocqlw.WrapCQLSession(session), nil
}

// newSessionPrerequisites creates tables and types before creating a session
func (c *Configuration) newSessionPrerequisites() error {
if !c.Schema.CreateSchema {
return nil
}

cfg := *c // clone because we need to connect without specifying a keyspace
cfg.Schema.Keyspace = ""

session, err := createSession(&cfg)
if err != nil {
return err
}

sc := schemaCreator{
session: session,
schema: c.Schema,
}

return sc.createSchemaIfNotPresent()
}

// NewSession creates a new Cassandra session
func (c *Configuration) NewSession() (cassandra.Session, error) {
if err := c.newSessionPrerequisites(); err != nil {
return nil, err
}

return createSession(c)
}

// NewCluster creates a new gocql cluster from the configuration
func (c *Configuration) NewCluster() (*gocql.ClusterConfig, error) {
cluster := gocql.NewCluster(c.Connection.Servers...)
Expand Down Expand Up @@ -210,7 +283,27 @@ func (c *Configuration) String() string {
return fmt.Sprintf("%+v", *c)
}

func isValidTTL(duration time.Duration) bool {
return duration == 0 || duration >= time.Second
}

func (c *Configuration) Validate() error {
_, err := govalidator.ValidateStruct(c)
return err
if err != nil {
return err
}

if !isValidTTL(c.Schema.TraceTTL) {
return errors.New("trace_ttl can either be 0 or greater than or equal to 1 second")
}

if !isValidTTL(c.Schema.DependenciesTTL) {
return errors.New("dependencies_ttl can either be 0 or greater than or equal to 1 second")
}

if c.Schema.CompactionWindow < time.Minute {
return errors.New("compaction_window should at least be 1 minute")
}

return nil
}
24 changes: 24 additions & 0 deletions pkg/cassandra/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package config

import (
"testing"
"time"

"github.com/gocql/gocql"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -43,6 +44,9 @@ func TestValidate_DoesNotReturnErrorWhenRequiredFieldsSet(t *testing.T) {
Connection: Connection{
Servers: []string{"localhost:9200"},
},
Schema: Schema{
CompactionWindow: time.Minute,
},
}

err := cfg.Validate()
Expand Down Expand Up @@ -94,3 +98,23 @@ func TestToString(t *testing.T) {
s := cfg.String()
assert.Contains(t, s, "Keyspace:test")
}

func TestConfigSchemaValidation(t *testing.T) {
cfg := DefaultConfiguration()
err := cfg.Validate()
require.NoError(t, err)

cfg.Schema.TraceTTL = time.Millisecond
err = cfg.Validate()
require.Error(t, err)

cfg.Schema.TraceTTL = time.Second
cfg.Schema.CompactionWindow = time.Minute - 1
err = cfg.Validate()
require.Error(t, err)

cfg.Schema.CompactionWindow = time.Minute
cfg.Schema.DependenciesTTL = time.Second - 1
err = cfg.Validate()
require.Error(t, err)
}
145 changes: 145 additions & 0 deletions pkg/cassandra/config/schema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright (c) 2024 The Jaeger Authors.
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
// SPDX-License-Identifier: Apache-2.0

package config

import (
"bytes"
"embed"
"errors"
"fmt"
"text/template"
"time"

"github.com/jaegertracing/jaeger/pkg/cassandra"
)

//go:embed v004-go-tmpl.cql.tmpl
var schemaFile embed.FS

type templateParams struct {
// Keyspace in which tables and types will be created for storage
Keyspace string
// Replication is the replication strategy used. Ex: "{'class': 'NetworkTopologyStrategy', 'replication_factor': '1' }"
Replication string
// CompactionWindowInMinutes is constructed from CompactionWindow for using in template
CompactionWindowInMinutes int64
// TraceTTLInSeconds is constructed from TraceTTL for using in template
TraceTTLInSeconds int64
// DependenciesTTLInSeconds is constructed from DependenciesTTL for using in template
DependenciesTTLInSeconds int64
}

type schemaCreator struct {
session cassandra.Session
schema Schema
}

func (sc *schemaCreator) constructTemplateParams() templateParams {
return templateParams{
Keyspace: sc.schema.Keyspace,
Replication: fmt.Sprintf("{'class': 'NetworkTopologyStrategy', 'replication_factor': '%v' }", sc.schema.ReplicationFactor),
CompactionWindowInMinutes: int64(sc.schema.CompactionWindow / time.Minute),
TraceTTLInSeconds: int64(sc.schema.TraceTTL / time.Second),
DependenciesTTLInSeconds: int64(sc.schema.DependenciesTTL / time.Second),
}
}

func (*schemaCreator) getQueryFileAsBytes(fileName string, params templateParams) ([]byte, error) {
tmpl, err := template.ParseFS(schemaFile, fileName)
if err != nil {
return nil, err
}

var result bytes.Buffer
err = tmpl.Execute(&result, params)
if err != nil {
return nil, err
}

return result.Bytes(), nil
}

func (*schemaCreator) getQueriesFromBytes(queryFile []byte) ([]string, error) {
lines := bytes.Split(queryFile, []byte("\n"))

var extractedLines [][]byte

for _, line := range lines {
// Remove any comments, if at the end of the line
commentIndex := bytes.Index(line, []byte(`--`))
if commentIndex != -1 {
// remove everything after comment
line = line[0:commentIndex]
}

trimmedLine := bytes.TrimSpace(line)

if len(trimmedLine) == 0 {
continue
}

extractedLines = append(extractedLines, trimmedLine)
}

var queries []string

// Construct individual queries strings
var queryString string
for _, line := range extractedLines {
queryString += string(line) + "\n"
if bytes.HasSuffix(line, []byte(";")) {
queries = append(queries, queryString)
queryString = ""
}
}

if len(queryString) > 0 {
return nil, errors.New(`query exists in template without ";"`)
}

return queries, nil
}

func (sc *schemaCreator) getCassandraQueriesFromQueryStrings(queries []string) []cassandra.Query {
var casQueries []cassandra.Query

for _, query := range queries {
casQueries = append(casQueries, sc.session.Query(query))
}

return casQueries
}

func (sc *schemaCreator) contructSchemaQueries() ([]cassandra.Query, error) {
params := sc.constructTemplateParams()

queryFile, err := sc.getQueryFileAsBytes(`v004-go-tmpl.cql.tmpl`, params)
if err != nil {
return nil, err
}

queryStrings, err := sc.getQueriesFromBytes(queryFile)
if err != nil {
return nil, err
}

casQueries := sc.getCassandraQueriesFromQueryStrings(queryStrings)

return casQueries, nil
}

func (sc *schemaCreator) createSchemaIfNotPresent() error {
casQueries, err := sc.contructSchemaQueries()
if err != nil {
return err
}

for _, query := range casQueries {
if err := query.Exec(); err != nil {
return err
}
}

return nil
}
Loading
Loading