Skip to content

Commit

Permalink
feat(pipeline): add blob expiration time to run logs (#938)
Browse files Browse the repository at this point in the history
Because

- When the blob data associated to a pipeline / component run expires,
the backend will stop serving it. It will be useful to both backend and
clients to know that the data isn't there because it expired (instead of
because an error occurred).

This commit

- Adds a field with the blob data expiration data to the run entities.
  • Loading branch information
jvallesm authored Dec 16, 2024
1 parent 1f043f6 commit fa7ef0e
Show file tree
Hide file tree
Showing 15 changed files with 1,011 additions and 85 deletions.
118 changes: 86 additions & 32 deletions cmd/migration/main.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,29 @@
package main

import (
"context"
"database/sql"
"fmt"
"os"

"github.com/golang-migrate/migrate/v4"
"go.uber.org/zap"

_ "github.com/golang-migrate/migrate/v4/database/postgres"
_ "github.com/golang-migrate/migrate/v4/source/file"

"github.com/instill-ai/pipeline-backend/config"
"github.com/instill-ai/pipeline-backend/pkg/db/migration"
"github.com/instill-ai/pipeline-backend/pkg/db/migration/convert/legacy"
"github.com/instill-ai/pipeline-backend/pkg/external"
"github.com/instill-ai/pipeline-backend/pkg/logger"
"github.com/instill-ai/pipeline-backend/pkg/service"

database "github.com/instill-ai/pipeline-backend/pkg/db"
)

var log *zap.Logger

func checkExist(databaseConfig config.DatabaseConfig) error {
db, err := sql.Open(
"postgres",
Expand All @@ -28,63 +37,62 @@ func checkExist(databaseConfig config.DatabaseConfig) error {
)

if err != nil {
panic(err)
return fmt.Errorf("opening database connection: %w", err)
}

defer db.Close()

// Open() may just validate its arguments without creating a connection to the database.
// To verify that the data source name is valid, call Ping().
// Open() may just validate its arguments without creating a connection to
// the database. To verify that the data source name is valid, call Ping().
if err = db.Ping(); err != nil {
panic(err)
return fmt.Errorf("pinging database: %w", err)
}

var rows *sql.Rows
rows, err = db.Query(fmt.Sprintf("SELECT datname FROM pg_catalog.pg_database WHERE lower(datname) = lower('%s');", databaseConfig.Name))

if err != nil {
panic(err)
return fmt.Errorf("executing database name query: %w", err)
}

dbExist := false
defer rows.Close()
for rows.Next() {
var databaseName string
if err := rows.Scan(&databaseName); err != nil {
panic(err)
return fmt.Errorf("scanning database name from row: %w", err)
}

if databaseConfig.Name == databaseName {
dbExist = true
fmt.Printf("Database %s exist\n", databaseName)
log.With(zap.String("name", databaseName)).Info("Database exists")
}
}

if err := rows.Err(); err != nil {
panic(err)
return fmt.Errorf("scanning rows: %w", err)
}

if !dbExist {
fmt.Printf("Create database %s\n", databaseConfig.Name)
log.With(zap.String("name", databaseConfig.Name)).Info("Create database")
if _, err := db.Exec(fmt.Sprintf("CREATE DATABASE %s;", databaseConfig.Name)); err != nil {
return err
return fmt.Errorf("creating database: %w", err)
}
}

return nil
}

func main() {
migrateFolder, _ := os.Getwd()
ctx := context.Background()
log, _ = logger.GetZapLogger(ctx)

if err := config.Init(config.ParseConfigFlag()); err != nil {
panic(err)
log.With(zap.Error(err)).Fatal("Loading configuration")
}

databaseConfig := config.Config.Database

if err := checkExist(databaseConfig); err != nil {
panic(err)
log.With(zap.Error(err)).Fatal("Checking database existence")
}

dsn := fmt.Sprintf("postgres://%s:%s@%s:%d/%s?%s",
Expand All @@ -96,59 +104,105 @@ func main() {
"sslmode=disable",
)

m, err := migrate.New(fmt.Sprintf("file:///%s/pkg/db/migration", migrateFolder), dsn)
codeMigrator, cleanup := initCodeMigrator(ctx)
defer cleanup()

if err := runMigration(dsn, databaseConfig.Version, codeMigrator.Migrate); err != nil {
log.With(zap.Error(err)).Fatal("Running migration")
}
}

func runMigration(
dsn string,
expectedVersion uint,
execCode func(version uint) error,
) error {
migrateFolder, err := os.Getwd()
if err != nil {
panic(err)
return fmt.Errorf("accessing base path: %w", err)
}

curVersion, dirty, err := m.Version()
m, err := migrate.New(fmt.Sprintf("file:///%s/pkg/db/migration", migrateFolder), dsn)
if err != nil {
return fmt.Errorf("creating migration: %w", err)
}

curVersion, dirty, err := m.Version()
if err != nil && curVersion != 0 {
panic(err)
return fmt.Errorf("getting current version: %w", err)
}

expectedVersion := databaseConfig.Version
log.With(
zap.Uint("expectedVersion", expectedVersion),
zap.Uint("currentVersion", curVersion),
zap.Bool("dirty", dirty),
).Info("Running migration")

fmt.Printf("Expected migration version is %d\n", expectedVersion)
fmt.Printf("The current schema version is %d, and dirty flag is %t\n", curVersion, dirty)
if dirty {
panic("The database has dirty flag, please fix it")
return fmt.Errorf("database is dirty, please fix it")
}

step := curVersion
for {
if expectedVersion <= step {
fmt.Printf("Migration to version %d complete\n", expectedVersion)
log.With(zap.Uint("expectedVersion", expectedVersion)).Info("Migration completed")
break
}

switch step {
case 5:
if err := legacy.MigratePipelineRecipeUp000006(); err != nil {
panic(err)
return fmt.Errorf("running legacy step 6: %w", err)
}
case 6:
if err := legacy.MigratePipelineRecipeUp000007(); err != nil {
panic(err)
return fmt.Errorf("running legacy step 7: %w", err)
}
case 11:
if err := legacy.MigratePipelineRecipeUp000012(); err != nil {
panic(err)
return fmt.Errorf("running legacy step 12: %w", err)
}
}

fmt.Printf("Step up to version %d\n", step+1)
log.With(zap.Uint("step", step+1)).Info("Step up")
if err := m.Steps(1); err != nil {
panic(err)
return fmt.Errorf("stepping up: %w", err)
}

if step, _, err = m.Version(); err != nil {
panic(err)
return fmt.Errorf("getting new version: %w", err)
}

if err := execCode(step); err != nil {
return fmt.Errorf("running associated code: %w", err)
}
}

return nil
}

func initCodeMigrator(ctx context.Context) (cm *migration.CodeMigrator, cleanup func()) {
l, _ := logger.GetZapLogger(ctx)
cleanups := make([]func(), 0)

rh := service.NewRetentionHandler()
mgmtPrivateServiceClient, mgmtPrivateServiceClientConn := external.InitMgmtPrivateServiceClient(ctx)
if mgmtPrivateServiceClientConn != nil {
cleanups = append(cleanups, func() { _ = mgmtPrivateServiceClientConn.Close() })
}

db := database.GetConnection().WithContext(ctx)
cleanups = append(cleanups, func() { database.Close(db) })
codeMigrator := &migration.CodeMigrator{
Logger: l,
DB: db,
RetentionHandler: rh,
MGMTPrivateServiceClient: mgmtPrivateServiceClient,
}

if err := migration.Migrate(step); err != nil {
panic(err)
return codeMigrator, func() {
for _, cleanup := range cleanups {
cleanup()
}
}
}
2 changes: 1 addition & 1 deletion config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ database:
host: pg-sql
port: 5432
name: pipeline
version: 38
version: 39
timezone: Etc/UTC
pool:
idleconnections: 5
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ require (
github.com/h2non/filetype v1.1.3
github.com/iancoleman/strcase v0.3.0
github.com/influxdata/influxdb-client-go/v2 v2.12.3
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241129105617-c2c298e76498
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241213145904-c3d8111872b5
github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a
github.com/instill-ai/x v0.5.0-alpha.0.20241213094923-890bb310fcb2
github.com/itchyny/gojq v0.12.14
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1285,8 +1285,8 @@ github.com/influxdata/influxdb-client-go/v2 v2.12.3 h1:28nRlNMRIV4QbtIUvxhWqaxn0
github.com/influxdata/influxdb-client-go/v2 v2.12.3/go.mod h1:IrrLUbCjjfkmRuaCiGQg4m2GbkaeJDcuWoxiWdQEbA0=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241129105617-c2c298e76498 h1:JYRfk/m+960jEScE1NPQM+xh+ScR4cHNMg/tCFwQbpI=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241129105617-c2c298e76498/go.mod h1:rf0UY7VpEgpaLudYEcjx5rnbuwlBaaLyD4FQmWLtgAY=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241213145904-c3d8111872b5 h1:5338ZeuB/C50P8aUOUKstjBSAQaWqmrdrAK2i9AbWk8=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20241213145904-c3d8111872b5/go.mod h1:rf0UY7VpEgpaLudYEcjx5rnbuwlBaaLyD4FQmWLtgAY=
github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a h1:gmy8BcCFDZQan40c/D3f62DwTYtlCwi0VrSax+pKffw=
github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a/go.mod h1:EpX3Yr661uWULtZf5UnJHfr5rw2PDyX8ku4Kx0UtYFw=
github.com/instill-ai/x v0.5.0-alpha.0.20241213094923-890bb310fcb2 h1:JHQpGbLn8ViRH3WNdOEt+smFkyhDJCs6U6hDbJ9suHA=
Expand Down
Loading

0 comments on commit fa7ef0e

Please sign in to comment.