Skip to content

Commit

Permalink
chore(jobsdb): add support for schema migration changesets to run always
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum committed Nov 29, 2022
1 parent 4e8df6f commit 66caa37
Show file tree
Hide file tree
Showing 25 changed files with 73 additions and 181 deletions.
46 changes: 43 additions & 3 deletions jobsdb/jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -839,10 +839,50 @@ func (jd *HandleT) init() {
// so that we can be sure that all the necessary tables are created and considered to be in
// the latest schema version, before rudder-migrator starts introducing new tables.
jd.dsListLock.WithLock(func(l lock.LockToken) {
switch jd.ownerType {
case Write, ReadWrite:
jd.setupDatabaseTables(l, jd.clearAll)

// TODO: we need to make sure that no other process will be able to add or remove tables
// during the migration, otherwise schema migration may panic and db will be left in a dirty
// state.
// An advisory lock, shared between schema migration, internal migration and
// add new ds loop might be necessary.

writer := jd.ownerType == Write || jd.ownerType == ReadWrite
if writer && jd.clearAll {
jd.dropDatabaseTables(l)
}
templateData := func() map[string]interface{} {
// Important: if jobsdb type is acting as a writer then refreshDSList
// doesn't return the full list of datasets, only the rightmost two.
// But we need to run the schema migration against all datasets, no matter
// whether jobsdb is a writer or not.
datasets := getDSList(jd, jd.dbHandle, jd.tablePrefix)

datasetIndices := make([]string, 0)
for _, dataset := range datasets {
datasetIndices = append(datasetIndices, dataset.Index)
}

return map[string]interface{}{
"Prefix": jd.tablePrefix,
"Datasets": datasetIndices,
}
}()

if writer {
jd.setupDatabaseTables(templateData)
}

// Run changesets that should always run for both writer and reader jobsdbs.
//
// When running separate gw and processor instances we cannot control the order of execution
// and we cannot guarantee that while gw migration is running or after it is complete, processor
// will not create new tables using the old schema.
//
// Changesets that run always can help in such cases, by bringing non-migrated tables into a usable state.
jd.runAlwaysChangesets(templateData)

// finally refresh the dataset list to make sure [datasetList] field is populated
jd.refreshDSList(l)
})
}

Expand Down
54 changes: 16 additions & 38 deletions jobsdb/setup.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package jobsdb

import (
"database/sql"
"fmt"

"github.com/rudderlabs/rudder-server/config"
"github.com/rudderlabs/rudder-server/jobsdb/internal/lock"
migrator "github.com/rudderlabs/rudder-server/services/sql-migrator"
"github.com/rudderlabs/rudder-server/utils/misc"
)

// SchemaMigrationTable returns the table name used for storing current schema version.
Expand All @@ -20,50 +18,30 @@ func (jd *HandleT) SchemaMigrationTable() string {
// The following data are passed to JobsDB migration templates:
// - Prefix: The table prefix used by this jobsdb instance.
// - Datasets: Array of existing dataset indices.
// If clearAll is set to true, all existing jobsdb tables will be removed first.
func (jd *HandleT) setupDatabaseTables(l lock.LockToken, clearAll bool) {
if clearAll {
jd.dropDatabaseTables(l)
}

// Important: if jobsdb type is acting as a writer then refreshDSList
// doesn't return the full list of datasets, only the rightmost two.
// But we need to run the schema migration against all datasets, no matter
// whether jobsdb is a writer or not.
datasets := getDSList(jd, jd.dbHandle, jd.tablePrefix)

datasetIndices := make([]string, 0)
for _, dataset := range datasets {
datasetIndices = append(datasetIndices, dataset.Index)
}

templateData := map[string]interface{}{
"Prefix": jd.tablePrefix,
"Datasets": datasetIndices,
}

psqlInfo := misc.GetConnectionString()
db, err := sql.Open("postgres", psqlInfo)
if err != nil {
panic(fmt.Errorf("error DB for migrate open: %w", err))
}

defer func() { _ = db.Close() }()

func (jd *HandleT) setupDatabaseTables(templateData map[string]interface{}) {
// setup migrator with appropriate schema migrations table
m := &migrator.Migrator{
Handle: db,
Handle: jd.dbHandle,
MigrationsTable: jd.SchemaMigrationTable(),
ShouldForceSetLowerVersion: config.GetBool("SQLMigrator.forceSetLowerVersion", true),
}

// execute any necessary migrations
err = m.MigrateFromTemplates("jobsdb", templateData)
if err != nil {
if err := m.MigrateFromTemplates("jobsdb", templateData); err != nil {
panic(fmt.Errorf("error while migrating '%v' jobsdb tables: %w", jd.tablePrefix, err))
}
// finally refresh the dataset list to make sure [datasetList] field is populated
jd.refreshDSList(l)
}

func (jd *HandleT) runAlwaysChangesets(templateData map[string]interface{}) {
// setup migrator with appropriate schema migrations table
m := &migrator.Migrator{
Handle: jd.dbHandle,
MigrationsTable: fmt.Sprintf("%s_runalways_migrations", jd.tablePrefix),
RunAlways: true,
}
// execute any necessary migrations
if err := m.MigrateFromTemplates("jobsdb_always", templateData); err != nil {
panic(fmt.Errorf("error while running changesets that run always in '%s' jobsdb tables: %w", jd.tablePrefix, err))
}
}

func (jd *HandleT) dropDatabaseTables(l lock.LockToken) {
Expand Down
12 changes: 10 additions & 2 deletions services/sql-migrator/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ type Migrator struct {
// Indicates if migration version should be force reset to latest on file in case of revert to lower version
// Eg. DB has v3 set in MigrationsTable but latest version in MigrationsDir is v2
ShouldForceSetLowerVersion bool

// Indicates if all migrations should be run ignoring the current version in MigrationsTable
RunAlways bool
}

var pkgLogger logger.Logger
Expand Down Expand Up @@ -146,12 +149,17 @@ func (m *Migrator) MigrateFromTemplates(templatesDir string, context interface{}
}

if m.ShouldForceSetLowerVersion {
err := m.forceSetLowerVersion(migration, sourceDriver, destinationDriver)
if err != nil {
if err := m.forceSetLowerVersion(migration, sourceDriver, destinationDriver); err != nil {
return err
}
}

if m.RunAlways {
if err := migration.Force(-1); err != nil {
return fmt.Errorf("force migration version to 0: %w", err)
}
}

err = migration.Up()
if err != nil && err != migrate.ErrNoChange { // migrate library reports that no change was required, using ErrNoChange
return fmt.Errorf("run migration from template directory %q, %w", templatesDir, err)
Expand Down
1 change: 0 additions & 1 deletion sql/migrations/config_cache/000001_init_schema.down.sql

This file was deleted.

12 changes: 0 additions & 12 deletions sql/migrations/jobsdb/000001_create_tables.down.tmpl

This file was deleted.

22 changes: 0 additions & 22 deletions sql/migrations/jobsdb/000002_alter_dataset_tables.down.tmpl

This file was deleted.

2 changes: 0 additions & 2 deletions sql/migrations/jobsdb/000003_alter_journal_table.down.tmpl

This file was deleted.

4 changes: 0 additions & 4 deletions sql/migrations/jobsdb/000004_alter_status_table.down.tmpl

This file was deleted.

3 changes: 0 additions & 3 deletions sql/migrations/jobsdb/000005_alter_dataset_table.down.tmpl

This file was deleted.

3 changes: 0 additions & 3 deletions sql/migrations/jobsdb/000006_alter_dataset_table.down.tmpl

This file was deleted.

5 changes: 0 additions & 5 deletions sql/migrations/jobsdb/000007_add_index_rt_table.down.tmpl

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{{range .Datasets}}
CREATE INDEX IF NOT EXISTS "idx_{{$.Prefix}}_job_status_{{.}}_jid_id" ON "{{$.Prefix}}_job_status_{{.}}" (job_id asc,id desc);
CREATE OR REPLACE VIEW "v_last_{{$.Prefix}}_job_status_{{.}}" AS SELECT DISTINCT ON (job_id) * FROM "{{$.Prefix}}_job_status_{{.}}" ORDER BY job_id ASC,id DESC;
{{end}}
16 changes: 0 additions & 16 deletions sql/migrations/node/000001_create_event_schema.down.sql

This file was deleted.

This file was deleted.

This file was deleted.

5 changes: 0 additions & 5 deletions sql/migrations/node/000004_create_ops.down.sql

This file was deleted.

This file was deleted.

13 changes: 0 additions & 13 deletions sql/migrations/node/000007_remove_ops.down.sql

This file was deleted.

1 change: 0 additions & 1 deletion sql/migrations/node/000008_unionjobsdb_fn.down.sql

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

0 comments on commit 66caa37

Please sign in to comment.