Skip to content

Commit

Permalink
feat: Optimize migrations (#603)
Browse files Browse the repository at this point in the history
* wip

* feat: add configurable timeout on generator image

* feat: change startup probe using healthcheck instead of delaying the server startup

* fix: move migration order to create an index concurrently

* fix: blocking migration

* fix: slow migration

* fix: cache storage migration healthcheck result

* fix: pg_attribute bloated

* chore: change batch sizes for migrations
  • Loading branch information
gfyrag authored Dec 9, 2024
1 parent 2e3bb90 commit bf3c12c
Show file tree
Hide file tree
Showing 65 changed files with 460 additions and 291 deletions.
2 changes: 1 addition & 1 deletion cmd/buckets_upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func NewBucketUpgrade() *cobra.Command {
}()

if args[0] == "*" {
return driver.UpgradeAllBuckets(cmd.Context(), make(chan struct{}))
return driver.UpgradeAllBuckets(cmd.Context())
}

return driver.UpgradeBucket(cmd.Context(), args[0])
Expand Down
24 changes: 16 additions & 8 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@ package cmd

import (
"github.com/formancehq/go-libs/v2/bun/bunmigrate"
"github.com/formancehq/go-libs/v2/logging"
"github.com/formancehq/go-libs/v2/service"
"github.com/formancehq/ledger/internal/storage/bucket"
"github.com/formancehq/ledger/internal/storage/driver"
"github.com/formancehq/ledger/internal/storage/ledger"
systemstore "github.com/formancehq/ledger/internal/storage/system"
"github.com/uptrace/bun"

"github.com/spf13/cobra"
Expand Down Expand Up @@ -34,17 +39,20 @@ func NewRootCommand() *cobra.Command {
root.AddCommand(serve)
root.AddCommand(buckets)
root.AddCommand(version)
root.AddCommand(bunmigrate.NewDefaultCommand(func(cmd *cobra.Command, _ []string, _ *bun.DB) error {
// todo: use provided db ...
driver, db, err := getDriver(cmd)
if err != nil {
root.AddCommand(bunmigrate.NewDefaultCommand(func(cmd *cobra.Command, _ []string, db *bun.DB) error {
logger := logging.NewDefaultLogger(cmd.OutOrStdout(), service.IsDebug(cmd), false, false)
cmd.SetContext(logging.ContextWithLogger(cmd.Context(), logger))

driver := driver.New(
ledger.NewFactory(db),
systemstore.New(db),
bucket.NewDefaultFactory(db),
)
if err := driver.Initialize(cmd.Context()); err != nil {
return err
}
defer func() {
_ = db.Close()
}()

return driver.UpgradeAllBuckets(cmd.Context(), make(chan struct{}))
return driver.UpgradeAllBuckets(cmd.Context())
}))
root.AddCommand(NewDocsCommand())

Expand Down
16 changes: 6 additions & 10 deletions deployments/pulumi/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/formancehq/ledger/deployments/pulumi/pkg"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi"
"github.com/pulumi/pulumi/sdk/v3/go/pulumi/config"
"github.com/pulumi/pulumi/sdk/v3/go/pulumix"
)

func main() {
Expand Down Expand Up @@ -35,23 +36,18 @@ func deploy(ctx *pulumi.Context) error {
}
}

debug, _ := conf.TryBool("debug")
imagePullPolicy, _ := conf.Try("image.pullPolicy")

replicaCount, _ := conf.TryInt("replicaCount")
experimentalFeatures, _ := conf.TryBool("experimentalFeatures")

_, err = pulumi_ledger.NewComponent(ctx, "ledger", &pulumi_ledger.ComponentArgs{
Namespace: pulumi.String(namespace),
Timeout: pulumi.Int(timeout),
Tag: pulumi.String(version),
ImagePullPolicy: pulumi.String(imagePullPolicy),
ImagePullPolicy: pulumi.String(conf.Get("image.pullPolicy")),
Postgres: pulumi_ledger.PostgresArgs{
URI: pulumi.String(postgresURI),
},
Debug: pulumi.Bool(debug),
ReplicaCount: pulumi.Int(replicaCount),
ExperimentalFeatures: pulumi.Bool(experimentalFeatures),
Debug: pulumi.Bool(conf.GetBool("debug")),
ReplicaCount: pulumi.Int(conf.GetInt("replicaCount")),
ExperimentalFeatures: pulumi.Bool(conf.GetBool("experimentalFeatures")),
Upgrade: pulumix.Val(pulumi_ledger.UpgradeMode(config.Get(ctx, "upgrade-mode"))),
})

return err
Expand Down
75 changes: 48 additions & 27 deletions deployments/pulumi/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,38 +19,59 @@ import (

func TestProgram(t *testing.T) {

ctx := logging.TestingContext()
stackName := "ledger-tests-pulumi-" + uuid.NewString()[:8]
type testCase struct {
name string
config map[string]string
}
for _, tc := range []testCase{
{
name: "nominal",
config: map[string]string{
"timeout": "30",
},
},
{
name: "upgrade using a job",
config: map[string]string{
"timeout": "30",
"upgrade-mode": "job",
},
},
} {
t.Run(tc.name, func(t *testing.T) {
ctx := logging.TestingContext()
stackName := "ledger-tests-pulumi-" + uuid.NewString()[:8]

stack, err := auto.UpsertStackInlineSource(ctx, stackName, "ledger-tests-pulumi-postgres", deployPostgres(stackName))
require.NoError(t, err)
stack, err := auto.UpsertStackInlineSource(ctx, stackName, "ledger-tests-pulumi-postgres", deployPostgres(stackName))
require.NoError(t, err)

t.Log("Deploy pg stack")
up, err := stack.Up(ctx, optup.ProgressStreams(os.Stdout), optup.ErrorProgressStreams(os.Stderr))
require.NoError(t, err)
t.Log("Deploy pg stack")
up, err := stack.Up(ctx, optup.ProgressStreams(os.Stdout), optup.ErrorProgressStreams(os.Stderr))
require.NoError(t, err)

t.Cleanup(func() {
t.Log("Destroy stack")
_, err := stack.Destroy(ctx, optdestroy.Remove(), optdestroy.ProgressStreams(os.Stdout), optdestroy.ErrorProgressStreams(os.Stderr))
require.NoError(t, err)
})
t.Cleanup(func() {
t.Log("Destroy stack")
_, err := stack.Destroy(ctx, optdestroy.Remove(), optdestroy.ProgressStreams(os.Stdout), optdestroy.ErrorProgressStreams(os.Stderr))
require.NoError(t, err)
})

postgresURI := up.Outputs["uri"].Value.(string)
postgresURI := up.Outputs["uri"].Value.(string)

t.Log("Test program")
integration.ProgramTest(t, &integration.ProgramTestOptions{
Quick: true,
SkipRefresh: true,
Dir: ".",
Config: map[string]string{
"namespace": stackName,
"postgres.uri": postgresURI,
"timeout": "30",
},
Stdout: os.Stdout,
Stderr: os.Stderr,
Verbose: testing.Verbose(),
})
tc.config["postgres.uri"] = postgresURI
tc.config["namespace"] = stackName

t.Log("Test program")
integration.ProgramTest(t, &integration.ProgramTestOptions{
Quick: true,
SkipRefresh: true,
Dir: ".",
Config: tc.config,
Stdout: os.Stdout,
Stderr: os.Stderr,
Verbose: testing.Verbose(),
})
})
}
}

func deployPostgres(stackName string) func(ctx *pulumi.Context) error {
Expand Down
78 changes: 47 additions & 31 deletions deployments/pulumi/pkg/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,21 @@ import (

var ErrPostgresURIRequired = fmt.Errorf("postgresURI is required")

type UpgradeMode string

const (
UpgradeModeDisabled UpgradeMode = "disabled"
UpgradeModeJob UpgradeMode = "job"
UpgradeModeInApp UpgradeMode = "in-app"
)

type Component struct {
pulumi.ResourceState

ServiceName pulumix.Output[string]
ServiceNamespace pulumix.Output[string]
ServicePort pulumix.Output[int]
ServiceInternalURL pulumix.Output[string]
Migrations pulumix.Output[*batchv1.Job]
}

type PostgresArgs struct {
Expand Down Expand Up @@ -73,13 +80,12 @@ type ComponentArgs struct {
Debug pulumix.Input[bool]
ReplicaCount pulumix.Input[int]
GracePeriod pulumix.Input[string]
AutoUpgrade pulumix.Input[bool]
WaitUpgrade pulumix.Input[bool]
BallastSizeInBytes pulumix.Input[int]
NumscriptCacheMaxCount pulumix.Input[int]
BulkMaxSize pulumix.Input[int]
BulkParallel pulumix.Input[int]
TerminationGracePeriodSeconds pulumix.Input[*int]
Upgrade pulumix.Input[UpgradeMode]

ExperimentalFeatures pulumix.Input[bool]
ExperimentalNumscriptInterpreter pulumix.Input[bool]
Expand Down Expand Up @@ -129,14 +135,29 @@ func NewComponent(ctx *pulumi.Context, name string, args *ComponentArgs, opts ..
}
ledgerImage := pulumi.Sprintf("ghcr.io/formancehq/ledger:%s", tag)

autoUpgrade := pulumix.Val(true)
if args.AutoUpgrade != nil {
autoUpgrade = args.AutoUpgrade.ToOutput(ctx.Context())
upgradeMode := UpgradeModeInApp
if args.Upgrade != nil {
var (
upgradeModeChan = make(chan UpgradeMode, 1)
)
pulumix.ApplyErr(args.Upgrade, func(upgradeMode UpgradeMode) (any, error) {
upgradeModeChan <- upgradeMode
close(upgradeModeChan)
return nil, nil
})

select {
case <-ctx.Context().Done():
return nil, ctx.Context().Err()
case upgradeMode = <-upgradeModeChan:
if upgradeMode == "" {
upgradeMode = UpgradeModeInApp
}
}
}

waitUpgrade := pulumix.Val(true)
if args.WaitUpgrade != nil {
waitUpgrade = args.WaitUpgrade.ToOutput(ctx.Context())
if upgradeMode != "" && upgradeMode != UpgradeModeDisabled && upgradeMode != UpgradeModeJob && upgradeMode != UpgradeModeInApp {
return nil, fmt.Errorf("invalid upgrade mode: %s", upgradeMode)
}

imagePullPolicy := pulumix.Val("")
Expand Down Expand Up @@ -351,18 +372,10 @@ func NewComponent(ctx *pulumi.Context, name string, args *ComponentArgs, opts ..
})
}

if args.AutoUpgrade != nil {
if upgradeMode == UpgradeModeInApp {
envVars = append(envVars, corev1.EnvVarArgs{
Name: pulumi.String("AUTO_UPGRADE"),
Value: pulumix.Apply2Err(autoUpgrade, waitUpgrade, func(autoUpgrade, waitUpgrade bool) (string, error) {
if waitUpgrade && !autoUpgrade {
return "", fmt.Errorf("waitUpgrade requires autoUpgrade to be true")
}
if !autoUpgrade {
return "false", nil
}
return "true", nil
}).Untyped().(pulumi.StringOutput),
Name: pulumi.String("AUTO_UPGRADE"),
Value: pulumi.String("true"),
})
}

Expand Down Expand Up @@ -472,23 +485,26 @@ func NewComponent(ctx *pulumi.Context, name string, args *ComponentArgs, opts ..
Port: pulumi.String("http"),
},
FailureThreshold: pulumi.Int(1),
PeriodSeconds: pulumi.Int(10),
PeriodSeconds: pulumi.Int(60),
TimeoutSeconds: pulumi.IntPtr(3),
},
ReadinessProbe: corev1.ProbeArgs{
HttpGet: corev1.HTTPGetActionArgs{
Path: pulumi.String("/_healthcheck"),
Port: pulumi.String("http"),
},
FailureThreshold: pulumi.Int(1),
PeriodSeconds: pulumi.Int(10),
PeriodSeconds: pulumi.Int(60),
TimeoutSeconds: pulumi.IntPtr(3),
},
StartupProbe: corev1.ProbeArgs{
HttpGet: corev1.HTTPGetActionArgs{
Path: pulumi.String("/_healthcheck"),
Port: pulumi.String("http"),
},
FailureThreshold: pulumi.Int(60),
PeriodSeconds: pulumi.Int(5),
PeriodSeconds: pulumi.Int(5),
InitialDelaySeconds: pulumi.IntPtr(2),
TimeoutSeconds: pulumi.IntPtr(3),
},
Env: envVars,
},
Expand All @@ -501,11 +517,8 @@ func NewComponent(ctx *pulumi.Context, name string, args *ComponentArgs, opts ..
return nil, err
}

cmp.Migrations = pulumix.ApplyErr(waitUpgrade, func(waitUpgrade bool) (*batchv1.Job, error) {
if !waitUpgrade {
return nil, nil
}
return batchv1.NewJob(ctx, "wait-migration-completion", &batchv1.JobArgs{
if upgradeMode == UpgradeModeJob {
_, err = batchv1.NewJob(ctx, "migrate", &batchv1.JobArgs{
Metadata: &metav1.ObjectMetaArgs{
Namespace: namespace.Untyped().(pulumi.StringOutput),
},
Expand All @@ -515,7 +528,7 @@ func NewComponent(ctx *pulumi.Context, name string, args *ComponentArgs, opts ..
RestartPolicy: pulumi.String("OnFailure"),
Containers: corev1.ContainerArray{
corev1.ContainerArgs{
Name: pulumi.String("check"),
Name: pulumi.String("migrate"),
Args: pulumi.StringArray{
pulumi.String("migrate"),
},
Expand All @@ -537,7 +550,10 @@ func NewComponent(ctx *pulumi.Context, name string, args *ComponentArgs, opts ..
},
},
}, pulumi.Parent(cmp))
})
if err != nil {
return nil, err
}
}

service, err := corev1.NewService(ctx, "ledger", &corev1.ServiceArgs{
Metadata: &metav1.ObjectMetaArgs{
Expand Down
23 changes: 19 additions & 4 deletions docs/api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,10 @@ Accept: application/json
}
}
}
]
],
"errorCode": "VALIDATION",
"errorMessage": "[VALIDATION] invalid 'cursor' query param",
"details": "https://play.numscript.org/?payload=eyJlcnJvciI6ImFjY291bnQgaGFkIGluc3VmZmljaWVudCBmdW5kcyJ9"
}
```

Expand Down Expand Up @@ -3244,7 +3247,7 @@ Authorization ( Scopes: ledger:write )
|*anonymous*|INTERPRETER_PARSE|
|*anonymous*|INTERPRETER_RUNTIME|
|*anonymous*|LEDGER_ALREADY_EXISTS|
|*anonymous*|BUCKET_OUTDATED|
|*anonymous*|OUTDATED_SCHEMA|

<h2 id="tocS_V2LedgerInfoResponse">V2LedgerInfoResponse</h2>
<!-- backwards compatibility -->
Expand Down Expand Up @@ -3788,16 +3791,28 @@ and
}
}
}
]
],
"errorCode": "VALIDATION",
"errorMessage": "[VALIDATION] invalid 'cursor' query param",
"details": "https://play.numscript.org/?payload=eyJlcnJvciI6ImFjY291bnQgaGFkIGluc3VmZmljaWVudCBmdW5kcyJ9"
}

```

### Properties

allOf

|Name|Type|Required|Restrictions|Description|
|---|---|---|---|---|
|*anonymous*|object|false|none|none|
|» data|[[V2BulkElementResult](#schemav2bulkelementresult)]|false|none|none|

and

|Name|Type|Required|Restrictions|Description|
|---|---|---|---|---|
|data|[[V2BulkElementResult](#schemav2bulkelementresult)]|true|none|none|
|*anonymous*|[V2ErrorResponse](#schemav2errorresponse)|false|none|none|

<h2 id="tocS_V2BulkElementResult">V2BulkElementResult</h2>
<!-- backwards compatibility -->
Expand Down
1 change: 0 additions & 1 deletion internal/api/common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ const (
ErrMetadataOverride = "METADATA_OVERRIDE"
ErrBulkSizeExceeded = "BULK_SIZE_EXCEEDED"
ErrLedgerAlreadyExists = "LEDGER_ALREADY_EXISTS"
ErrBucketOutdated = "BUCKET_OUTDATED"

ErrInterpreterParse = "INTERPRETER_PARSE"
ErrInterpreterRuntime = "INTERPRETER_RUNTIME"
Expand Down
Loading

0 comments on commit bf3c12c

Please sign in to comment.