Skip to content

Commit

Permalink
fix(migrations): with empty references
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Nov 22, 2024
1 parent ebb8982 commit e69c795
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 20 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ require (
github.com/rs/cors v1.11.1 // indirect
github.com/shirou/gopsutil/v4 v4.24.10 // indirect
github.com/shomali11/util v0.0.0-20220717175126-f0771b70947f // indirect
github.com/sirupsen/logrus v1.9.3
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/tklauser/go-sysconf v0.3.14 // indirect
github.com/tklauser/numcpus v0.9.0 // indirect
github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc // indirect
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
create unique index concurrently transactions_reference2 on "{{.Schema}}".transactions (ledger, reference);
-- todo: clean empty reference in subsequent migration
create unique index concurrently transactions_reference2 on "{{.Schema}}".transactions (ledger, reference) where reference <> '';
38 changes: 30 additions & 8 deletions internal/storage/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@ import (
)

type Driver struct {
ledgerStoreFactory ledgerstore.Factory
systemStore systemstore.Store
bucketFactory bucket.Factory
tracer trace.Tracer
meter metric.Meter
ledgerStoreFactory ledgerstore.Factory
systemStore systemstore.Store
bucketFactory bucket.Factory
tracer trace.Tracer
meter metric.Meter

migrationRetryPeriod time.Duration
migratorLockRetryInterval time.Duration
parallelBucketMigrations int
}

func (d *Driver) CreateLedger(ctx context.Context, l *ledger.Ledger) (*ledgerstore.Store, error) {
Expand Down Expand Up @@ -155,7 +158,7 @@ func (d *Driver) UpgradeAllBuckets(ctx context.Context, minimalVersionReached ch

sem := make(chan struct{}, len(buckets))

wp := pond.New(10, len(buckets), pond.Context(ctx))
wp := pond.New(d.parallelBucketMigrations, len(buckets), pond.Context(ctx))

for _, bucketName := range buckets {
wp.Submit(func() {
Expand All @@ -174,7 +177,7 @@ func (d *Driver) UpgradeAllBuckets(ctx context.Context, minimalVersionReached ch
go func() {
logger.Infof("Upgrading...")
errChan <- b.Migrate(
ctx,
logging.ContextWithLogger(ctx, logger),
minimalVersionReached,
migrations.WithLockRetryInterval(d.migratorLockRetryInterval),
)
Expand All @@ -188,7 +191,12 @@ func (d *Driver) UpgradeAllBuckets(ctx context.Context, minimalVersionReached ch
case err := <-errChan:
if err != nil {
logger.Errorf("Error upgrading: %s", err)
continue l
select {
case <-time.After(d.migrationRetryPeriod):
continue l
case <-ctx.Done():
return
}
}
if sem != nil {
logger.Infof("Reached minimal workable version")
Expand Down Expand Up @@ -263,7 +271,21 @@ func WithMigratorLockRetryInterval(interval time.Duration) Option {
}
}

func WithParallelBucketMigration(p int) Option {
return func(d *Driver) {
d.parallelBucketMigrations = p
}
}

func WithMigrationRetryPeriod(p time.Duration) Option {
return func(d *Driver) {
d.migrationRetryPeriod = p
}
}

var defaultOptions = []Option{
WithMeter(noopmetrics.Meter{}),
WithTracer(nooptracer.Tracer{}),
WithParallelBucketMigration(10),
WithMigrationRetryPeriod(5 * time.Second),
}
9 changes: 3 additions & 6 deletions internal/storage/driver/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/formancehq/ledger/internal/storage/driver"
ledgerstore "github.com/formancehq/ledger/internal/storage/ledger"
"github.com/google/uuid"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"testing"
Expand Down Expand Up @@ -113,16 +112,14 @@ func TestUpgradeAllLedgers(t *testing.T) {
t.Run("and error", func(t *testing.T) {
t.Parallel()

//ctx := context.Background()

ctx := logging.ContextWithLogger(ctx, logging.NewLogrus(logrus.New()))

ctrl := gomock.NewController(t)
ledgerStoreFactory := driver.NewLedgerStoreFactory(ctrl)
bucketFactory := driver.NewBucketFactory(ctrl)
systemStore := driver.NewSystemStore(ctrl)

d := driver.New(ledgerStoreFactory, systemStore, bucketFactory)
d := driver.New(ledgerStoreFactory, systemStore, bucketFactory,
driver.WithMigrationRetryPeriod(10*time.Millisecond),
)

bucket1 := driver.NewMockBucket(ctrl)
bucket2 := driver.NewMockBucket(ctrl)
Expand Down
23 changes: 19 additions & 4 deletions test/migrations/upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,21 @@ import (
var (
sourceDatabase string
destinationDatabase string
skipCopy bool
skipMigrate bool
)

func TestMain(m *testing.M) {
flag.StringVar(&sourceDatabase, "databases.source", "", "Source database")
flag.StringVar(&destinationDatabase, "databases.destination", "", "Destination database")
flag.BoolVar(&skipCopy, "skip-copy", false, "Skip copying database")
flag.BoolVar(&skipMigrate, "skip-migrate", false, "Skip migrating database")
flag.Parse()

os.Exit(m.Run())
}

func TestMigrations(t *testing.T) {
if sourceDatabase == "" {
t.Skip()
}

ctx := logging.TestingContext()
dockerPool := docker.NewPool(t, logging.Testing())
Expand All @@ -46,18 +47,30 @@ func TestMigrations(t *testing.T) {
destinationDatabase = pgServer.GetDSN()
}

copyDatabase(t, dockerPool, sourceDatabase, destinationDatabase)
if !skipCopy {
if sourceDatabase == "" {
t.Skip()
}

copyDatabase(t, dockerPool, sourceDatabase, destinationDatabase)
fmt.Println("Database copied")
}

db, err := bunconnect.OpenSQLDB(ctx, bunconnect.ConnectionOptions{
DatabaseSourceName: destinationDatabase,
})
require.NoError(t, err)

if skipMigrate {
return
}

// Migrate database
driver := driver.New(
ledger.NewFactory(db),
systemstore.New(db),
bucket.NewDefaultFactory(db),
driver.WithParallelBucketMigration(1),
)
require.NoError(t, driver.Initialize(ctx))
require.NoError(t, driver.UpgradeAllBuckets(ctx, make(chan struct{})))
Expand Down Expand Up @@ -111,6 +124,7 @@ func preparePGDumpCommand(t *testing.T, dsn string) string {
"-x", // Skip privileges
"-h", parsedSource.Hostname(),
"-p", parsedSource.Port(),
"-v",
)

if username := parsedSource.User.Username(); username != "" {
Expand All @@ -133,6 +147,7 @@ func preparePSQLCommand(t *testing.T, dsn string) string {

args = append(args,
"psql",
"--echo-all",
"-h", parsedSource.Hostname(),
"-p", parsedSource.Port(),
parsedSource.Path[1:],
Expand Down

0 comments on commit e69c795

Please sign in to comment.