diff --git a/go.mod b/go.mod index 52290d3f2..cf7f1d2b9 100644 --- a/go.mod +++ b/go.mod @@ -161,7 +161,7 @@ require ( github.com/rs/cors v1.11.1 // indirect github.com/shirou/gopsutil/v4 v4.24.10 // indirect github.com/shomali11/util v0.0.0-20220717175126-f0771b70947f // indirect - github.com/sirupsen/logrus v1.9.3 + github.com/sirupsen/logrus v1.9.3 // indirect github.com/tklauser/go-sysconf v0.3.14 // indirect github.com/tklauser/numcpus v0.9.0 // indirect github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc // indirect diff --git a/internal/storage/bucket/migrations/14-transaction-reference-index/up.sql b/internal/storage/bucket/migrations/14-transaction-reference-index/up.sql index 98fa61296..ab3e87586 100644 --- a/internal/storage/bucket/migrations/14-transaction-reference-index/up.sql +++ b/internal/storage/bucket/migrations/14-transaction-reference-index/up.sql @@ -1 +1,2 @@ -create unique index concurrently transactions_reference2 on "{{.Schema}}".transactions (ledger, reference); \ No newline at end of file +-- todo: clean empty reference in subsequent migration +create unique index concurrently transactions_reference2 on "{{.Schema}}".transactions (ledger, reference) where reference <> ''; \ No newline at end of file diff --git a/internal/storage/driver/driver.go b/internal/storage/driver/driver.go index 52ca88cba..175af6959 100644 --- a/internal/storage/driver/driver.go +++ b/internal/storage/driver/driver.go @@ -26,12 +26,15 @@ import ( ) type Driver struct { - ledgerStoreFactory ledgerstore.Factory - systemStore systemstore.Store - bucketFactory bucket.Factory - tracer trace.Tracer - meter metric.Meter + ledgerStoreFactory ledgerstore.Factory + systemStore systemstore.Store + bucketFactory bucket.Factory + tracer trace.Tracer + meter metric.Meter + + migrationRetryPeriod time.Duration migratorLockRetryInterval time.Duration + parallelBucketMigrations int } func (d *Driver) CreateLedger(ctx context.Context, l *ledger.Ledger) (*ledgerstore.Store, error) { @@ -155,7 +158,7 @@ func (d *Driver) UpgradeAllBuckets(ctx context.Context, minimalVersionReached ch sem := make(chan struct{}, len(buckets)) - wp := pond.New(10, len(buckets), pond.Context(ctx)) + wp := pond.New(d.parallelBucketMigrations, len(buckets), pond.Context(ctx)) for _, bucketName := range buckets { wp.Submit(func() { @@ -174,7 +177,7 @@ func (d *Driver) UpgradeAllBuckets(ctx context.Context, minimalVersionReached ch go func() { logger.Infof("Upgrading...") errChan <- b.Migrate( - ctx, + logging.ContextWithLogger(ctx, logger), minimalVersionReached, migrations.WithLockRetryInterval(d.migratorLockRetryInterval), ) @@ -188,7 +191,12 @@ func (d *Driver) UpgradeAllBuckets(ctx context.Context, minimalVersionReached ch case err := <-errChan: if err != nil { logger.Errorf("Error upgrading: %s", err) - continue l + select { + case <-time.After(d.migrationRetryPeriod): + continue l + case <-ctx.Done(): + return + } } if sem != nil { logger.Infof("Reached minimal workable version") @@ -263,7 +271,21 @@ func WithMigratorLockRetryInterval(interval time.Duration) Option { } } +func WithParallelBucketMigration(p int) Option { + return func(d *Driver) { + d.parallelBucketMigrations = p + } +} + +func WithMigrationRetryPeriod(p time.Duration) Option { + return func(d *Driver) { + d.migrationRetryPeriod = p + } +} + var defaultOptions = []Option{ WithMeter(noopmetrics.Meter{}), WithTracer(nooptracer.Tracer{}), + WithParallelBucketMigration(10), + WithMigrationRetryPeriod(5 * time.Second), } diff --git a/internal/storage/driver/driver_test.go b/internal/storage/driver/driver_test.go index 6c2578bf4..a0e0a5811 100644 --- a/internal/storage/driver/driver_test.go +++ b/internal/storage/driver/driver_test.go @@ -14,7 +14,6 @@ import ( "github.com/formancehq/ledger/internal/storage/driver" ledgerstore "github.com/formancehq/ledger/internal/storage/ledger" "github.com/google/uuid" - "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" "testing" @@ -113,16 +112,14 @@ func TestUpgradeAllLedgers(t *testing.T) { t.Run("and error", func(t *testing.T) { t.Parallel() - //ctx := context.Background() - - ctx := logging.ContextWithLogger(ctx, logging.NewLogrus(logrus.New())) - ctrl := gomock.NewController(t) ledgerStoreFactory := driver.NewLedgerStoreFactory(ctrl) bucketFactory := driver.NewBucketFactory(ctrl) systemStore := driver.NewSystemStore(ctrl) - d := driver.New(ledgerStoreFactory, systemStore, bucketFactory) + d := driver.New(ledgerStoreFactory, systemStore, bucketFactory, + driver.WithMigrationRetryPeriod(10*time.Millisecond), + ) bucket1 := driver.NewMockBucket(ctrl) bucket2 := driver.NewMockBucket(ctrl) diff --git a/test/migrations/upgrade_test.go b/test/migrations/upgrade_test.go index a4ef5ccf8..d43dbf7e0 100644 --- a/test/migrations/upgrade_test.go +++ b/test/migrations/upgrade_test.go @@ -23,20 +23,21 @@ import ( var ( sourceDatabase string destinationDatabase string + skipCopy bool + skipMigrate bool ) func TestMain(m *testing.M) { flag.StringVar(&sourceDatabase, "databases.source", "", "Source database") flag.StringVar(&destinationDatabase, "databases.destination", "", "Destination database") + flag.BoolVar(&skipCopy, "skip-copy", false, "Skip copying database") + flag.BoolVar(&skipMigrate, "skip-migrate", false, "Skip migrating database") flag.Parse() os.Exit(m.Run()) } func TestMigrations(t *testing.T) { - if sourceDatabase == "" { - t.Skip() - } ctx := logging.TestingContext() dockerPool := docker.NewPool(t, logging.Testing()) @@ -46,18 +47,30 @@ func TestMigrations(t *testing.T) { destinationDatabase = pgServer.GetDSN() } - copyDatabase(t, dockerPool, sourceDatabase, destinationDatabase) + if !skipCopy { + if sourceDatabase == "" { + t.Skip() + } + + copyDatabase(t, dockerPool, sourceDatabase, destinationDatabase) + fmt.Println("Database copied") + } db, err := bunconnect.OpenSQLDB(ctx, bunconnect.ConnectionOptions{ DatabaseSourceName: destinationDatabase, }) require.NoError(t, err) + if skipMigrate { + return + } + // Migrate database driver := driver.New( ledger.NewFactory(db), systemstore.New(db), bucket.NewDefaultFactory(db), + driver.WithParallelBucketMigration(1), ) require.NoError(t, driver.Initialize(ctx)) require.NoError(t, driver.UpgradeAllBuckets(ctx, make(chan struct{}))) @@ -111,6 +124,7 @@ func preparePGDumpCommand(t *testing.T, dsn string) string { "-x", // Skip privileges "-h", parsedSource.Hostname(), "-p", parsedSource.Port(), + "-v", ) if username := parsedSource.User.Username(); username != "" { @@ -133,6 +147,7 @@ func preparePSQLCommand(t *testing.T, dsn string) string { args = append(args, "psql", + "--echo-all", "-h", parsedSource.Hostname(), "-p", parsedSource.Port(), parsedSource.Path[1:],