Skip to content

Commit

Permalink
feat: make buckets migration resilient regarding a bucket failure
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Nov 22, 2024
1 parent 3c0b196 commit d0f6978
Show file tree
Hide file tree
Showing 10 changed files with 104 additions and 95 deletions.
8 changes: 5 additions & 3 deletions Earthfile
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,17 @@ tests:
CACHE --id go-mod-cache /go/pkg/mod
CACHE --id go-cache /root/.cache/go-build
RUN go install github.com/onsi/ginkgo/v2/ginkgo@latest
RUN apk add gcc musl-dev

COPY --dir --pass-args (+generate/*) .

ARG includeIntegrationTests="true"
ARG coverage=""
ARG debug=false
ARG additionalArgs=""

ENV DEBUG=$debug
ENV CGO_ENABLED=1 # required for -race
RUN apk add gcc musl-dev

LET goFlags="-race"
IF [ "$coverage" = "true" ]
Expand All @@ -90,10 +92,10 @@ tests:
IF [ "$includeIntegrationTests" = "true" ]
SET goFlags="$goFlags -tags it"
WITH DOCKER --load=postgres:15-alpine=+postgres
RUN go test $goFlags ./...
RUN go test $goFlags $additionalArgs ./...
END
ELSE
RUN go test $goFlags ./...
RUN go test $goFlags $additionalArgs ./...
END
IF [ "$coverage" = "true" ]
# as special case, exclude files suffixed by debug.go
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/antlr/antlr4/runtime/Go/antlr v1.4.10
github.com/bluele/gcache v0.0.2
github.com/dop251/goja v0.0.0-20241009100908-5f46f2705ca3
github.com/formancehq/go-libs/v2 v2.0.1-0.20241121113152-18a3fc7aa771
github.com/formancehq/go-libs/v2 v2.0.1-0.20241121194732-b79c48b683f2
github.com/formancehq/ledger/pkg/client v0.0.0-00010101000000-000000000000
github.com/go-chi/chi/v5 v5.1.0
github.com/go-chi/cors v1.2.1
Expand All @@ -25,7 +25,7 @@ require (
github.com/jamiealquiza/tachymeter v2.0.0+incompatible
github.com/logrusorgru/aurora v2.0.3+incompatible
github.com/nats-io/nats.go v1.37.0
github.com/onsi/ginkgo/v2 v2.21.0
github.com/onsi/ginkgo/v2 v2.22.0
github.com/onsi/gomega v1.35.1
github.com/ory/dockertest/v3 v3.11.0
github.com/pborman/uuid v1.2.1
Expand Down Expand Up @@ -161,7 +161,7 @@ require (
github.com/rs/cors v1.11.1 // indirect
github.com/shirou/gopsutil/v4 v4.24.10 // indirect
github.com/shomali11/util v0.0.0-20220717175126-f0771b70947f // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/sirupsen/logrus v1.9.3
github.com/tklauser/go-sysconf v0.3.14 // indirect
github.com/tklauser/numcpus v0.9.0 // indirect
github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM=
github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/formancehq/go-libs/v2 v2.0.1-0.20241121113152-18a3fc7aa771 h1:iv1nF1Q+zEkMtc5HhSxNdCRQGAT0s+oCpW5SzyXbnT0=
github.com/formancehq/go-libs/v2 v2.0.1-0.20241121113152-18a3fc7aa771/go.mod h1:6vkHEfWEkDSPOv/G2o1Exxra3ouuYxRiCkznwKxTMHU=
github.com/formancehq/go-libs/v2 v2.0.1-0.20241121194732-b79c48b683f2 h1:JsacSDe6MYGCm04ZY/VJyYTUAWg8jhOBZm6AGyErF/I=
github.com/formancehq/go-libs/v2 v2.0.1-0.20241121194732-b79c48b683f2/go.mod h1:ZGHVcAC54ZbPgeoGKy/d31CHy94RMhHQlX+Vui15ST8=
github.com/formancehq/numscript v0.0.9-0.20241009144012-1150c14a1417 h1:LOd5hxnXDIBcehFrpW1OnXk+VSs0yJXeu1iAOO+Hji4=
github.com/formancehq/numscript v0.0.9-0.20241009144012-1150c14a1417/go.mod h1:btuSv05cYwi9BvLRxVs5zrunU+O1vTgigG1T6UsawcY=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
Expand Down Expand Up @@ -266,8 +266,8 @@ github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/onsi/ginkgo/v2 v2.21.0 h1:7rg/4f3rB88pb5obDgNZrNHrQ4e6WpjonchcpuBRnZM=
github.com/onsi/ginkgo/v2 v2.21.0/go.mod h1:7Du3c42kxCUegi0IImZ1wUQzMBVecgIHjR1C+NkhLQo=
github.com/onsi/ginkgo/v2 v2.22.0 h1:Yed107/8DjTr0lKCNt7Dn8yQ6ybuDRQoMGrNFKzMfHg=
github.com/onsi/ginkgo/v2 v2.22.0/go.mod h1:7Du3c42kxCUegi0IImZ1wUQzMBVecgIHjR1C+NkhLQo=
github.com/onsi/gomega v1.35.1 h1:Cwbd75ZBPxFSuZ6T+rN/WCb/gOc6YgFBXLlZLhC7Ds4=
github.com/onsi/gomega v1.35.1/go.mod h1:PvZbdDc8J6XJEpDK4HCuRBm8a6Fzp9/DmhC9C7yFlog=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
Expand Down
81 changes: 49 additions & 32 deletions internal/storage/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"github.com/alitto/pond"
"github.com/formancehq/go-libs/v2/metadata"
"github.com/formancehq/go-libs/v2/migrations"
"github.com/formancehq/go-libs/v2/platform/postgres"
Expand All @@ -13,7 +14,6 @@ import (
noopmetrics "go.opentelemetry.io/otel/metric/noop"
"go.opentelemetry.io/otel/trace"
nooptracer "go.opentelemetry.io/otel/trace/noop"
"golang.org/x/sync/errgroup"
"time"

ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger"
Expand Down Expand Up @@ -155,38 +155,58 @@ func (d *Driver) UpgradeAllBuckets(ctx context.Context, minimalVersionReached ch

sem := make(chan struct{}, len(buckets))

grp, ctx := errgroup.WithContext(ctx)
wp := pond.New(10, len(buckets), pond.Context(ctx))

for _, bucketName := range buckets {
grp.Go(func() error {
wp.Submit(func() {
logger := logging.FromContext(ctx).WithFields(map[string]any{
"bucket": bucketName,
})
b := d.bucketFactory.Create(bucketName)

minimalVersionReached := make(chan struct{})

go func() {
select {
case <-ctx.Done():
return
case <-minimalVersionReached:
logger.Infof("Reached minimal workable version")
sem <- struct{}{}
// copy semaphore to be able to nil it
sem := sem

l:
for {
minimalVersionReached := make(chan struct{})
errChan := make(chan error, 1)
go func() {
logger.Infof("Upgrading...")
errChan <- b.Migrate(
ctx,
minimalVersionReached,
migrations.WithLockRetryInterval(d.migratorLockRetryInterval),
)
}()

for {
logger.Infof("Waiting termination")
select {
case <-ctx.Done():
return
case err := <-errChan:
if err != nil {
logger.Errorf("Error upgrading: %s", err)
continue l
}
if sem != nil {
logger.Infof("Reached minimal workable version")
sem <- struct{}{}
}

logger.Info("Upgrade terminated")
return
case <-minimalVersionReached:
minimalVersionReached = nil
if sem != nil {
logger.Infof("Reached minimal workable version")
sem <- struct{}{}
sem = nil
}
}
}
}()

logger.Infof("Upgrading...")
if err := b.Migrate(
ctx,
minimalVersionReached,
migrations.WithLockRetryInterval(d.migratorLockRetryInterval),
); err != nil {
logger.Errorf("Error upgrading: %s", err)
return err
}
logging.Infof("Up to date")

return nil
})
}

Expand All @@ -199,14 +219,11 @@ func (d *Driver) UpgradeAllBuckets(ctx context.Context, minimalVersionReached ch
}

logging.FromContext(ctx).Infof("All buckets have reached minimal workable version")
select {
case <-minimalVersionReached:
// already closed
default:
close(minimalVersionReached)
}
close(minimalVersionReached)

wp.StopAndWait()

return grp.Wait()
return nil
}

func New(
Expand Down
43 changes: 28 additions & 15 deletions internal/storage/driver/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ import (
"github.com/formancehq/ledger/internal/storage/driver"
ledgerstore "github.com/formancehq/ledger/internal/storage/ledger"
"github.com/google/uuid"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"testing"
"time"
)

func TestUpgradeAllLedgers(t *testing.T) {
Expand All @@ -41,7 +43,7 @@ func TestUpgradeAllLedgers(t *testing.T) {
bucket := driver.NewMockBucket(ctrl)

systemStore.EXPECT().
GetDistinctBuckets(ctx).
GetDistinctBuckets(gomock.Any()).
Return([]string{ledger.DefaultBucket}, nil)

bucketFactory.EXPECT().
Expand All @@ -56,6 +58,9 @@ func TestUpgradeAllLedgers(t *testing.T) {
return nil
})

ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
t.Cleanup(cancel)

require.NoError(t, d.UpgradeAllBuckets(ctx, make(chan struct{})))
})

Expand Down Expand Up @@ -96,25 +101,28 @@ func TestUpgradeAllLedgers(t *testing.T) {
}

systemStore.EXPECT().
GetDistinctBuckets(ctx).
GetDistinctBuckets(gomock.Any()).
Return(bucketList, nil)

ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
t.Cleanup(cancel)

require.NoError(t, d.UpgradeAllBuckets(ctx, make(chan struct{})))
})

t.Run("and error", func(t *testing.T) {
t.Parallel()

//ctx := context.Background()

ctx := logging.ContextWithLogger(ctx, logging.NewLogrus(logrus.New()))

ctrl := gomock.NewController(t)
ledgerStoreFactory := driver.NewLedgerStoreFactory(ctrl)
bucketFactory := driver.NewBucketFactory(ctrl)
systemStore := driver.NewSystemStore(ctrl)

d := driver.New(
ledgerStoreFactory,
systemStore,
bucketFactory,
)
d := driver.New(ledgerStoreFactory, systemStore, bucketFactory)

bucket1 := driver.NewMockBucket(ctrl)
bucket2 := driver.NewMockBucket(ctrl)
Expand All @@ -141,11 +149,11 @@ func TestUpgradeAllLedgers(t *testing.T) {
DoAndReturn(func(ctx context.Context, minimalVersionReached chan struct{}, opts ...migrations.Option) error {
close(minimalVersionReached)
close(bucket1MigrationStarted)
<-ctx.Done()

return ctx.Err()
return nil
})

firstCall := true
bucket2.EXPECT().
Migrate(gomock.Any(), gomock.Any(), gomock.Any()).
AnyTimes().
Expand All @@ -154,21 +162,26 @@ func TestUpgradeAllLedgers(t *testing.T) {
case <-ctx.Done():
return ctx.Err()
case <-bucket1MigrationStarted:
return errors.New("unknown error")
if firstCall {
firstCall = false
return errors.New("unknown error")
}
close(minimalVersionReached)
return nil
}
})

systemStore.EXPECT().
GetDistinctBuckets(ctx).
GetDistinctBuckets(gomock.Any()).
AnyTimes().
Return(bucketList, nil)

err := d.UpgradeAllBuckets(ctx, allBucketsMinimalVersionReached)
require.Error(t, err)
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
t.Cleanup(cancel)

bucket1MigrationStarted = make(chan struct{})
err = d.UpgradeAllBuckets(ctx, allBucketsMinimalVersionReached)
require.Error(t, err)
err := d.UpgradeAllBuckets(ctx, allBucketsMinimalVersionReached)
require.NoError(t, err)
})
})
}
Expand Down
35 changes: 5 additions & 30 deletions internal/storage/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package storage

import (
"context"
"errors"
"github.com/formancehq/go-libs/v2/logging"
"github.com/formancehq/ledger/internal/storage/driver"
"go.uber.org/fx"
Expand All @@ -23,14 +22,13 @@ func NewFXModule(autoUpgrade bool) fx.Option {
)
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
upgradeContext, cancelContext = context.WithCancel(logging.ContextWithLogger(
context.Background(),
logging.FromContext(ctx),
))
upgradeContext, cancelContext = context.WithCancel(context.WithoutCancel(ctx))
go func() {
defer close(upgradeStopped)

migrate(upgradeContext, driver, minimalVersionReached)
if err := driver.UpgradeAllBuckets(upgradeContext, minimalVersionReached); err != nil {
logging.FromContext(ctx).Errorf("failed to upgrade all buckets: %v", err)
}
}()

select {
Expand All @@ -54,27 +52,4 @@ func NewFXModule(autoUpgrade bool) fx.Option {
)
}
return fx.Options(ret...)
}

func migrate(ctx context.Context, driver *driver.Driver, minimalVersionReached chan struct{}) {
for {
select {
case <-ctx.Done():
return
default:
logging.FromContext(ctx).Infof("Upgrading buckets...")
if err := driver.UpgradeAllBuckets(ctx, minimalVersionReached); err != nil {
// Long migrations can be cancelled (app rescheduled for example)
// before fully terminated, handle this gracefully, don't panic,
// the next start will try again.
if errors.Is(err, context.DeadlineExceeded) ||
errors.Is(err, context.Canceled) {
return
}
logging.FromContext(ctx).Errorf("Upgrading buckets: %s", err)
continue
}
return
}
}
}
}
2 changes: 1 addition & 1 deletion test/rolling-upgrades/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ replace github.com/formancehq/ledger/pkg/client => ../../pkg/client
replace github.com/formancehq/ledger => ../..

require (
github.com/formancehq/go-libs/v2 v2.0.1-0.20241121113152-18a3fc7aa771
github.com/formancehq/go-libs/v2 v2.0.1-0.20241121194732-b79c48b683f2
github.com/formancehq/ledger v0.0.0-00010101000000-000000000000
github.com/pulumi/pulumi-kubernetes/sdk/v4 v4.12.0
github.com/pulumi/pulumi/sdk/v3 v3.117.0
Expand Down
4 changes: 2 additions & 2 deletions test/rolling-upgrades/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM=
github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU=
github.com/formancehq/go-libs/v2 v2.0.1-0.20241121113152-18a3fc7aa771 h1:iv1nF1Q+zEkMtc5HhSxNdCRQGAT0s+oCpW5SzyXbnT0=
github.com/formancehq/go-libs/v2 v2.0.1-0.20241121113152-18a3fc7aa771/go.mod h1:6vkHEfWEkDSPOv/G2o1Exxra3ouuYxRiCkznwKxTMHU=
github.com/formancehq/go-libs/v2 v2.0.1-0.20241121194732-b79c48b683f2 h1:JsacSDe6MYGCm04ZY/VJyYTUAWg8jhOBZm6AGyErF/I=
github.com/formancehq/go-libs/v2 v2.0.1-0.20241121194732-b79c48b683f2/go.mod h1:ZGHVcAC54ZbPgeoGKy/d31CHy94RMhHQlX+Vui15ST8=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/gliderlabs/ssh v0.3.7 h1:iV3Bqi942d9huXnzEF2Mt+CY9gLu8DNM4Obd+8bODRE=
Expand Down
2 changes: 1 addition & 1 deletion tools/generator/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ replace github.com/formancehq/ledger => ../..
replace github.com/formancehq/ledger/pkg/client => ../../pkg/client

require (
github.com/formancehq/go-libs/v2 v2.0.1-0.20241121113152-18a3fc7aa771
github.com/formancehq/go-libs/v2 v2.0.1-0.20241121194732-b79c48b683f2
github.com/formancehq/ledger v0.0.0-00010101000000-000000000000
github.com/formancehq/ledger/pkg/client v0.0.0-00010101000000-000000000000
github.com/spf13/cobra v1.8.1
Expand Down
Loading

0 comments on commit d0f6978

Please sign in to comment.