Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: store database code as code [DET-9180] #9302

Merged
merged 9 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion master/cmd/determined-master/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func runMigrate(cmd *cobra.Command, args []string) error {
}
}()

if _, err = database.Migrate(config.DB.Migrations, args); err != nil {
if _, err = database.Migrate(config.DB.Migrations, config.DB.ViewsAndTriggers, args); err != nil {
return errors.Wrap(err, "running migrations")
}

Expand Down
2 changes: 1 addition & 1 deletion master/internal/api_user_intg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func setupAPITest(t *testing.T, pgdb *db.PgDB,

if pgdb == nil {
if thePgDB == nil {
thePgDB = db.MustResolveTestPostgres(t)
thePgDB, _ = db.MustResolveTestPostgres(t)
db.MustMigrateTestPostgres(t, thePgDB, "file://../static/migrations")
require.NoError(t, etc.SetRootPath("../static/srv"))

Expand Down
3 changes: 2 additions & 1 deletion master/internal/cache/cache_intg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import (

func TestCache(t *testing.T) {
require.NoError(t, etc.SetRootPath(db.RootFromDB))
dbIns := db.MustResolveTestPostgres(t)
dbIns, close := db.MustResolveTestPostgres(t)
defer close()
db.MustMigrateTestPostgres(t, dbIns, db.MigrationsFromDB)

user := db.RequireMockUser(t, dbIns)
Expand Down
2 changes: 1 addition & 1 deletion master/internal/checkpoint_gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

func TestRunCheckpointGCTask(t *testing.T) {
pgDB := db.MustResolveTestPostgres(t)
pgDB, _ := db.MustResolveTestPostgres(t)
db.MustMigrateTestPostgres(t, pgDB, "file://../static/migrations")
user := db.RequireMockUser(t, pgDB)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func sortUUIDSlice(uuids []uuid.UUID) {
}

func TestMain(m *testing.M) {
pgDB, err := db.ResolveTestPostgres()
pgDB, _, err := db.ResolveTestPostgres()
if err != nil {
log.Panicln(err)
}
Expand Down
5 changes: 2 additions & 3 deletions master/internal/command/command_intg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,21 +167,20 @@ func TestTensorboardManagerLifecycle(t *testing.T) {
}

func setupTest(t *testing.T) *db.PgDB {
pgDB := db.MustSetupTestPostgres(t)
// First init the new Command Service
var mockRM mocks.ResourceManager
sub := sproto.NewAllocationSubscription(queue.New[sproto.ResourcesEvent](), func() {})
mockRM.On("Allocate", mock.Anything, mock.Anything).Return(sub, nil)
mockRM.On("Release", mock.Anything, mock.Anything).Return(nil)
mockRM.On("SetGroupPriority", mock.Anything, mock.Anything).Return(nil)

cs, _ := NewService(pgDB, &mockRM)
cs, _ := NewService(db.SingleDB(), &mockRM)
SetDefaultService(cs)

jobservice.SetDefaultService(&mockRM)

require.NotNil(t, DefaultCmdService)
return pgDB
return db.SingleDB()
}

func CreateMockGenericReq(t *testing.T, pgDB *db.PgDB) *CreateGeneric {
Expand Down
2 changes: 1 addition & 1 deletion master/internal/command/postgres_command_intg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
)

func TestMain(m *testing.M) {
pgDB, err := db.ResolveTestPostgres()
pgDB, _, err := db.ResolveTestPostgres()
if err != nil {
log.Panicln(err)
}
Expand Down
18 changes: 10 additions & 8 deletions master/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,15 @@ type CacheConfig struct {

// DBConfig hosts configuration fields of the database.
type DBConfig struct {
User string `json:"user"`
Password string `json:"password"`
Migrations string `json:"migrations"`
Host string `json:"host"`
Port string `json:"port"`
Name string `json:"name"`
SSLMode string `json:"ssl_mode"`
SSLRootCert string `json:"ssl_root_cert"`
User string `json:"user"`
Password string `json:"password"`
Migrations string `json:"migrations"`
ViewsAndTriggers string `json:"views_and_triggers"`
Host string `json:"host"`
Port string `json:"port"`
Name string `json:"name"`
SSLMode string `json:"ssl_mode"`
SSLRootCert string `json:"ssl_root_cert"`
}

// WebhooksConfig hosts configuration fields for webhook functionality.
Expand Down Expand Up @@ -319,6 +320,7 @@ func (c *Config) Resolve() error {
c.Root = root

c.DB.Migrations = fmt.Sprintf("file://%s", filepath.Join(c.Root, "static/migrations"))
c.DB.ViewsAndTriggers = filepath.Join(c.Root, "static/views_and_triggers")

// We must resolve resources before we apply pool defaults.
if err := c.ResolveResource(); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion master/internal/db/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

// DB is an interface for _all_ the functionality packed into the DB.
type DB interface {
Migrate(migrationURL string, actions []string) (isNew bool, err error)
Migrate(migrationURL, codeURL string, actions []string) (isNew bool, err error)
Close() error
GetOrCreateClusterID(telemetryID string) (string, error)
TrialExperimentAndRequestID(id int) (int, model.RequestID, error)
Expand Down
135 changes: 123 additions & 12 deletions master/internal/db/migrations.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package db

import (
"crypto/sha256"
"encoding/hex"
"fmt"
"os"
"path/filepath"
"regexp"
"strconv"

"github.com/go-pg/migrations/v8"
"github.com/go-pg/pg/v10"
"github.com/jackc/pgconn"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -111,8 +116,117 @@ func ensureMigrationUpgrade(tx *pg.Tx) error {
return nil
}

func (db *PgDB) readDBCodeAndCheckIfDifferent(dbCodeDir string) (map[string]string, bool, error) {
files, err := os.ReadDir(dbCodeDir)
if err != nil {
return nil, false, fmt.Errorf("reading '%s' directory for database views: %w", dbCodeDir, err)
}

allCode := ""
fileNamesToSQL := make(map[string]string)
for _, f := range files {
if filepath.Ext(f.Name()) != ".sql" {
continue
}

filePath := filepath.Join(dbCodeDir, f.Name())
b, err := os.ReadFile(filePath) //nolint: gosec // We trust dbCodeDir.
if err != nil {
return nil, false, fmt.Errorf("reading view definition file '%s': %w", filePath, err)
}
fileNamesToSQL[f.Name()] = string(b)
allCode += string(b)
}

// I didn't want to get into deciding when to apply database or code or not but integration
// tests make it really hard to not do this.
hash := sha256.Sum256([]byte(allCode))
ourHash := hex.EncodeToString(hash[:])

// Check if the views_and_triggers_hash table exists. If it doesn't return that we need to create db code.
var tableExists bool
if err = db.sql.QueryRow(
"SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_name = 'views_and_triggers_hash')").
Scan(&tableExists); err != nil {
return nil, false, fmt.Errorf("checking views_and_triggers_hash exists: %w", err)
}
if !tableExists {
return fileNamesToSQL, true, nil
}

// Check if our hashes match. If they do we can just return we don't need to do anything.
var databaseHash string
if err := db.sql.QueryRow("SELECT hash FROM views_and_triggers_hash").Scan(&databaseHash); err != nil {
return nil, false, fmt.Errorf("getting hash from views_and_triggers_hash: %w", err)
}
if databaseHash == ourHash {
return fileNamesToSQL, false, nil
}

// Update our hash and return we need to create views and triggers.
if _, err := db.sql.Exec("UPDATE views_and_triggers_hash SET hash = $1", ourHash); err != nil {
return nil, false, fmt.Errorf("updating our database hash: %w", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we be returning true here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no just returning the zero value.

If an error is not nil the other return values should not be looked at. it is really hard to work with code that returns non nil errors and 'meaningful' values.

}
return fileNamesToSQL, false, nil
}

func (db *PgDB) addDBCode(fileNamesToSQL map[string]string) error {
if err := db.withTransaction("determined database views", func(tx *sqlx.Tx) error {
for filePath, sql := range fileNamesToSQL {
if _, err := tx.Exec(sql); err != nil {
return fmt.Errorf("running database view file '%s': %w", filePath, err)
}
}

return nil
}); err != nil {
return fmt.Errorf("adding determined database views: %w", err)
}

return nil
}

func (db *PgDB) dropDBCode() error {
// SET search_path since the ALTER DATABASE ... SET SEARCH_PATH won't apply to this connection
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

im confused by this comment - can you elaborate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// since it was created after the migration ran.
if _, err := db.sql.Exec(`
DROP SCHEMA IF EXISTS determined_code CASCADE;
CREATE SCHEMA determined_code;`); err != nil {
return fmt.Errorf("removing determined database views so they can be created later: %w", err)
}

return nil
}

// This is set in an init in postgres_test_utils.go behind the intg feature flag.
// For normal usages this won't build. For tests we need to serialize access to
// run migrations.
var testOnlyDBLock func(sql *sqlx.DB) (unlock func())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This took me a minute to figure out - a comment indicating it gets initialized and acquired in init() in a test module would be a clarity improvement IMO.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the way it was factored out, though - much cleaner

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added comment


// Migrate runs the migrations from the specified directory URL.
func (db *PgDB) Migrate(migrationURL string, actions []string) (isNew bool, err error) {
func (db *PgDB) Migrate(
migrationURL string, dbCodeDir string, actions []string,
) (isNew bool, err error) {
if testOnlyDBLock != nil {
// In integration tests, multiple processes can be running this code at once, which can lead to
// errors because PostgreSQL's CREATE TABLE IF NOT EXISTS is not great with concurrency.
cleanup := testOnlyDBLock(db.sql)
defer cleanup()
}

dbCodeFiles, needToUpdateDBCode, err := db.readDBCodeAndCheckIfDifferent(dbCodeDir)
if err != nil {
return false, err
}
if needToUpdateDBCode {
if err := db.dropDBCode(); err != nil {
return false, err
}
log.Info("database views changed")
} else {
log.Info("database views unchanged, will not updated")
}

// go-pg/migrations uses go-pg/pg connection API, which is not compatible
// with pgx, so we use a one-off go-pg/pg connection.
pgOpts, err := makeGoPgOpts(db.URL)
Expand All @@ -139,17 +253,6 @@ func (db *PgDB) Migrate(migrationURL string, actions []string) (isNew bool, err
}
}()

// In integration tests, multiple processes can be running this code at once, which can lead to
// errors because PostgreSQL's CREATE TABLE IF NOT EXISTS is not great with concurrency.

// Arbitrarily chosen unique consistent ID for the lock.
const MigrationLockID = 0x33ad0708c9bed25b

_, err = tx.Exec("SELECT pg_advisory_xact_lock(?)", MigrationLockID)
if err != nil {
return false, err
}

if err = ensureMigrationUpgrade(tx); err != nil {
return false, errors.Wrap(err, "error upgrading migration metadata")
}
Expand Down Expand Up @@ -186,6 +289,14 @@ func (db *PgDB) Migrate(migrationURL string, actions []string) (isNew bool, err
log.Infof("migrated from %d to %d", oldVersion, newVersion)
}

if newVersion >= 20240502203516 { // Only comes up in testing old data.
if needToUpdateDBCode {
if err := db.addDBCode(dbCodeFiles); err != nil {
return false, err
}
}
}

log.Info("DB migrations completed")
return oldVersion == 0, nil
}
36 changes: 36 additions & 0 deletions master/internal/db/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,12 +175,48 @@ type PgDB struct {
URL string
}

func setSearchPath(sql *sqlx.DB) error {
// In integration tests, multiple processes can be running this code at once, which can lead to
// errors because PostgreSQL ALTER DATABASE can do weird things.
if testOnlyDBLock != nil {
cleanup := testOnlyDBLock(sql)
defer cleanup()
}

if _, err := sql.Exec(`DO $$
BEGIN
execute 'ALTER DATABASE "'||current_database()||'" SET SEARCH_PATH TO public,determined_code';
END
$$;`); err != nil {
return fmt.Errorf("setting search path on db connection: %w", err)
}

return nil
}

// ConnectPostgres connects to a Postgres database.
func ConnectPostgres(url string) (*PgDB, error) {
return connectPostgres(url, true)
}

func connectPostgres(url string, firstTime bool) (*PgDB, error) {
numTries := 0
for {
sql, err := sqlx.Connect("pgx", url)
if err == nil {
if firstTime {
// On first connection set the search path, close the connection and reconnect.
// There is a little bit of a chicken and egg problem with setting search path.
// We need to actually reconnect for this to take affect.
if err := setSearchPath(sql); err != nil {
return nil, err
}
if err := sql.Close(); err != nil {
return nil, err
}
return connectPostgres(url, false)
}

db := &PgDB{sql: sql, queries: &StaticQueryMap{}, URL: url}
initTheOneBun(db)
return db, nil
Expand Down
3 changes: 2 additions & 1 deletion master/internal/db/postgres_agent_intg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ func agentStatsForRP(t *testing.T, rp string) []*agentStatsRow {

func TestRecordAgentStats(t *testing.T) {
require.NoError(t, etc.SetRootPath(RootFromDB))
db := MustResolveTestPostgres(t)
db, close := MustResolveTestPostgres(t)
defer close()
MustMigrateTestPostgres(t, db, MigrationsFromDB)

lowStartTimeBound := time.Now()
Expand Down
3 changes: 2 additions & 1 deletion master/internal/db/postgres_cluster_intg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import (
func TestClusterAPI(t *testing.T) {
require.NoError(t, etc.SetRootPath(RootFromDB))

db := MustResolveTestPostgres(t)
db, close := MustResolveTestPostgres(t)
defer close()
MustMigrateTestPostgres(t, db, MigrationsFromDB)

_, err := db.GetOrCreateClusterID("")
Expand Down
Loading
Loading