Skip to content

Commit

Permalink
Added cluster migration and migration template files.
Browse files Browse the repository at this point in the history
  • Loading branch information
Kugelschieber committed Nov 5, 2024
1 parent d05b855 commit ecc3bc0
Show file tree
Hide file tree
Showing 5 changed files with 504 additions and 12 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## 6.18.0

**This release contains backwards incompatible changes!**
**This release contains backward incompatible changes!**

* added cluster support for ClickHouse

Expand Down
3 changes: 3 additions & 0 deletions pkg/db/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ type ClientConfig struct {
// Port is the database port.
Port int

// Cluster is the optional database cluster to use.
Cluster string

// Database is the database schema.
Database string

Expand Down
46 changes: 36 additions & 10 deletions pkg/db/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package db

import (
"bufio"
"bytes"
"database/sql"
"embed"
"errors"
Expand All @@ -10,6 +11,7 @@ import (
"path/filepath"
"strconv"
"strings"
"text/template"
"time"
)

Expand All @@ -30,7 +32,7 @@ func Migrate(config *ClientConfig) error {
return err
}

if err := createMigrationsTable(client); err != nil {
if err := createMigrationsTable(client, config.Cluster); err != nil {
return err
}

Expand All @@ -40,14 +42,14 @@ func Migrate(config *ClientConfig) error {
return err
}

if err := runMigrations(client, version); err != nil {
if err := runMigrations(client, version, config.Cluster); err != nil {
return err
}

return client.Close()
}

func createMigrationsTable(client *Client) error {
func createMigrationsTable(client *Client, cluster string) error {
table := ""
err := client.QueryRow("SHOW TABLES LIKE 'schema_migrations'").Scan(&table)

Expand All @@ -56,7 +58,15 @@ func createMigrationsTable(client *Client) error {
}

if table == "" {
if _, err := client.DB.Exec("CREATE TABLE schema_migrations (version Int64, dirty UInt8, sequence UInt64) Engine=TinyLog"); err != nil {
query := ""

if cluster != "" {
query = fmt.Sprintf("CREATE TABLE schema_migrations ON CLUSTER '%s' (version Int64, dirty UInt8, sequence UInt64) Engine=ReplicatedMergeTree ORDER BY (version, dirty, sequence)", cluster)
} else {
query = "CREATE TABLE schema_migrations (version Int64, dirty UInt8, sequence UInt64) Engine=MergeTree ORDER BY (version, dirty, sequence)"
}

if _, err := client.DB.Exec(query); err != nil {
return err
}
}
Expand Down Expand Up @@ -87,14 +97,14 @@ func setMigrationVersion(client *Client, version int, dirty bool) error {
return err
}

func runMigrations(client *Client, version int) error {
func runMigrations(client *Client, version int, cluster string) error {
files, err := migrationFiles.ReadDir("schema")

if err != nil {
return err
}

migrations, err := loadMigrations(files, version)
migrations, err := loadMigrations(files, version, cluster)

if err != nil {
return err
Expand All @@ -119,7 +129,7 @@ func runMigrations(client *Client, version int) error {
return nil
}

func loadMigrations(files []fs.DirEntry, version int) ([]migration, error) {
func loadMigrations(files []fs.DirEntry, version int, cluster string) ([]migration, error) {
migrations := make([]migration, 0)

for _, f := range files {
Expand All @@ -130,7 +140,7 @@ func loadMigrations(files []fs.DirEntry, version int) ([]migration, error) {
return nil, err
}

statements, err := parseStatements(f.Name())
statements, err := parseStatements(f.Name(), cluster)

if err != nil {
return nil, err
Expand Down Expand Up @@ -159,14 +169,30 @@ func parseVersion(name string) (int, error) {
return int(version), err
}

func parseStatements(name string) ([]string, error) {
func parseStatements(name, cluster string) ([]string, error) {
content, err := fs.ReadFile(migrationFiles, filepath.Join("schema", name))

if err != nil {
return nil, fmt.Errorf("error reading migration file: %s", err)
}

scanner := bufio.NewScanner(strings.NewReader(string(content)))
tpl, err := template.New("").Parse(string(content))

if err != nil {
return nil, fmt.Errorf("error parsing migration template: %s", err)
}

var out bytes.Buffer

if err := tpl.Execute(&out, struct {
Cluster string
}{
Cluster: cluster,
}); err != nil {
return nil, fmt.Errorf("error executing migration template: %s", err)
}

scanner := bufio.NewScanner(strings.NewReader(out.String()))
var buffer strings.Builder

for scanner.Scan() {
Expand Down
68 changes: 67 additions & 1 deletion pkg/db/migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,76 @@ func TestParseVersion(t *testing.T) {
}

func TestParseStatements(t *testing.T) {
statements, err := parseStatements("0013_remove_backup.up.sql")
statements, err := parseStatements("0013_remove_backup.up.sql", "")
assert.NoError(t, err)
assert.Len(t, statements, 3)
assert.Equal(t, "DROP TABLE IF EXISTS session_backup", statements[0])
assert.Equal(t, "DROP TABLE IF EXISTS page_view_backup", statements[1])
assert.Equal(t, "DROP TABLE IF EXISTS event_backup", statements[2])
}

func TestParseCluster(t *testing.T) {
statements, err := parseStatements("0029_distributed.up.sql", "pirsch")
assert.NoError(t, err)
assert.Len(t, statements, 27)
assert.Equal(t, "DROP TABLE IF EXISTS schema_migrations", statements[0])
assert.Equal(t, `CREATE TABLE schema_migrations ON CLUSTER 'pirsch' (
`+"`version`"+` Int64,
`+"`dirty`"+` UInt8,
`+"`sequence`"+` UInt64
)
ENGINE = ReplicatedMergeTree
ORDER BY (version, dirty, sequence)`, statements[1])
assert.Equal(t, `CREATE TABLE IF NOT EXISTS session ON CLUSTER 'pirsch' (
`+"`sign`"+` Int8,
`+"`version`"+` UInt16,
`+"`client_id`"+` UInt64,
`+"`visitor_id`"+` UInt64,
`+"`session_id`"+` UInt32,
`+"`time`"+` DateTime64(3, 'UTC'),
`+"`start`"+` DateTime64(3, 'UTC'),
`+"`duration_seconds`"+` UInt32,
`+"`is_bounce`"+` Int8,
`+"`entry_path`"+` String,
`+"`exit_path`"+` String,
`+"`entry_title`"+` String,
`+"`exit_title`"+` String,
`+"`page_views`"+` UInt16,
`+"`language`"+` LowCardinality(String),
`+"`country_code`"+` LowCardinality(FixedString(2)),
`+"`region`"+` LowCardinality(String),
`+"`city`"+` String,
`+"`referrer`"+` String,
`+"`referrer_name`"+` String,
`+"`referrer_icon`"+` String,
`+"`os`"+` LowCardinality(String),
`+"`os_version`"+` LowCardinality(String),
`+"`browser`"+` LowCardinality(String),
`+"`browser_version`"+` LowCardinality(String),
`+"`desktop`"+` Int8,
`+"`mobile`"+` Int8,
`+"`screen_class`"+` LowCardinality(String),
`+"`utm_source`"+` String,
`+"`utm_medium`"+` String,
`+"`utm_campaign`"+` String,
`+"`utm_content`"+` String,
`+"`utm_term`"+` String,
`+"`extended`"+` UInt16 DEFAULT 0,
`+"`hostname`"+` String
)
ENGINE = ReplicatedVersionedCollapsingMergeTree('/clickhouse/tables/session/{shard}', '{replica}', sign, version)
PARTITION BY toYYYYMM(time)
ORDER BY (client_id, visitor_id, session_id, time)
SAMPLE BY visitor_id
SETTINGS index_granularity = 8192`, statements[8])
assert.Equal(t, `CREATE TABLE IF NOT EXISTS imported_browser ON CLUSTER 'pirsch' (
`+"`client_id`"+` UInt64,
`+"`date`"+` Date,
`+"`browser`"+` String,
`+"`visitors`"+` UInt32
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/imported_browser/{shard}', '{replica}')
PARTITION BY toYYYYMM(date)
ORDER BY (client_id, date)
SETTINGS index_granularity = 8192`, statements[12])
}
Loading

0 comments on commit ecc3bc0

Please sign in to comment.