Skip to content

Commit

Permalink
feat: change startup probe using healthcheck instead of delaying the …
Browse files Browse the repository at this point in the history
…server startup
  • Loading branch information
gfyrag committed Dec 6, 2024
1 parent 13ea98a commit 57e71f1
Show file tree
Hide file tree
Showing 26 changed files with 200 additions and 125 deletions.
2 changes: 1 addition & 1 deletion cmd/buckets_upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func NewBucketUpgrade() *cobra.Command {
}()

if args[0] == "*" {
return driver.UpgradeAllBuckets(cmd.Context(), make(chan struct{}))
return driver.UpgradeAllBuckets(cmd.Context())
}

return driver.UpgradeBucket(cmd.Context(), args[0])
Expand Down
2 changes: 1 addition & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func NewRootCommand() *cobra.Command {
_ = db.Close()
}()

return driver.UpgradeAllBuckets(cmd.Context(), make(chan struct{}))
return driver.UpgradeAllBuckets(cmd.Context())
}))
root.AddCommand(NewDocsCommand())

Expand Down
23 changes: 19 additions & 4 deletions docs/api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,10 @@ Accept: application/json
}
}
}
]
],
"errorCode": "VALIDATION",
"errorMessage": "[VALIDATION] invalid 'cursor' query param",
"details": "https://play.numscript.org/?payload=eyJlcnJvciI6ImFjY291bnQgaGFkIGluc3VmZmljaWVudCBmdW5kcyJ9"
}
```

Expand Down Expand Up @@ -3244,7 +3247,7 @@ Authorization ( Scopes: ledger:write )
|*anonymous*|INTERPRETER_PARSE|
|*anonymous*|INTERPRETER_RUNTIME|
|*anonymous*|LEDGER_ALREADY_EXISTS|
|*anonymous*|BUCKET_OUTDATED|
|*anonymous*|OUTDATED_SCHEMA|

<h2 id="tocS_V2LedgerInfoResponse">V2LedgerInfoResponse</h2>
<!-- backwards compatibility -->
Expand Down Expand Up @@ -3788,16 +3791,28 @@ and
}
}
}
]
],
"errorCode": "VALIDATION",
"errorMessage": "[VALIDATION] invalid 'cursor' query param",
"details": "https://play.numscript.org/?payload=eyJlcnJvciI6ImFjY291bnQgaGFkIGluc3VmZmljaWVudCBmdW5kcyJ9"
}

```

### Properties

allOf

|Name|Type|Required|Restrictions|Description|
|---|---|---|---|---|
|*anonymous*|object|false|none|none|
|» data|[[V2BulkElementResult](#schemav2bulkelementresult)]|false|none|none|

and

|Name|Type|Required|Restrictions|Description|
|---|---|---|---|---|
|data|[[V2BulkElementResult](#schemav2bulkelementresult)]|true|none|none|
|*anonymous*|[V2ErrorResponse](#schemav2errorresponse)|false|none|none|

<h2 id="tocS_V2BulkElementResult">V2BulkElementResult</h2>
<!-- backwards compatibility -->
Expand Down
1 change: 0 additions & 1 deletion internal/api/common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ const (
ErrMetadataOverride = "METADATA_OVERRIDE"
ErrBulkSizeExceeded = "BULK_SIZE_EXCEEDED"
ErrLedgerAlreadyExists = "LEDGER_ALREADY_EXISTS"
ErrBucketOutdated = "BUCKET_OUTDATED"

ErrInterpreterParse = "INTERPRETER_PARSE"
ErrInterpreterRuntime = "INTERPRETER_RUNTIME"
Expand Down
2 changes: 1 addition & 1 deletion internal/api/v2/controllers_ledgers_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func createLedger(systemController system.Controller) http.HandlerFunc {
errors.Is(err, ledger.ErrInvalidBucketName{}):
api.BadRequest(w, common.ErrValidation, err)
case errors.Is(err, system.ErrBucketOutdated):
api.BadRequest(w, common.ErrBucketOutdated, err)
api.BadRequest(w, common.ErrOutdatedSchema, err)
case errors.Is(err, system.ErrLedgerAlreadyExists):
api.BadRequest(w, common.ErrLedgerAlreadyExists, err)
default:
Expand Down
2 changes: 1 addition & 1 deletion internal/api/v2/controllers_ledgers_create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestLedgersCreate(t *testing.T) {
expectedBackendCall: true,
returnErr: system.ErrBucketOutdated,
expectStatusCode: http.StatusBadRequest,
expectErrorCode: common.ErrBucketOutdated,
expectErrorCode: common.ErrOutdatedSchema,
},
{
name: "unexpected error",
Expand Down
2 changes: 1 addition & 1 deletion internal/storage/bucket/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

type Bucket interface {
Migrate(ctx context.Context, minimalVersionReached chan struct{}, opts ...migrations.Option) error
Migrate(ctx context.Context, opts ...migrations.Option) error
AddLedger(ctx context.Context, ledger ledger.Ledger) error
HasMinimalVersion(ctx context.Context) (bool, error)
IsUpToDate(ctx context.Context) (bool, error)
Expand Down
4 changes: 2 additions & 2 deletions internal/storage/bucket/default_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ func (b *DefaultBucket) IsUpToDate(ctx context.Context) (bool, error) {
return GetMigrator(b.db, b.name).IsUpToDate(ctx)
}

func (b *DefaultBucket) Migrate(ctx context.Context, minimalVersionReached chan struct{}, options ...migrations.Option) error {
return migrate(ctx, b.tracer, b.db, b.name, minimalVersionReached, options...)
func (b *DefaultBucket) Migrate(ctx context.Context, options ...migrations.Option) error {
return migrate(ctx, b.tracer, b.db, b.name, options...)
}

func (b *DefaultBucket) HasMinimalVersion(ctx context.Context) (bool, error) {
Expand Down
2 changes: 1 addition & 1 deletion internal/storage/bucket/default_bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,5 @@ func TestBuckets(t *testing.T) {
require.NoError(t, system.Migrate(ctx, db))

b := bucket.NewDefault(db, noop.Tracer{}, name)
require.NoError(t, b.Migrate(ctx, make(chan struct{})))
require.NoError(t, b.Migrate(ctx))
}
22 changes: 1 addition & 21 deletions internal/storage/bucket/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,11 @@ func GetMigrator(db *bun.DB, name string, options ...migrations.Option) *migrati
return migrator
}

func migrate(ctx context.Context, tracer trace.Tracer, db *bun.DB, name string, minimalVersionReached chan struct{}, options ...migrations.Option) error {
func migrate(ctx context.Context, tracer trace.Tracer, db *bun.DB, name string, options ...migrations.Option) error {
ctx, span := tracer.Start(ctx, "Migrate bucket")
defer span.End()

migrator := GetMigrator(db, name, options...)
version, err := migrator.GetLastVersion(ctx)
if err != nil {
if !errors.Is(err, migrations.ErrMissingVersionTable) {
return err
}
}

if version >= MinimalSchemaVersion {
close(minimalVersionReached)
}

for {
err := migrator.UpByOne(ctx)
Expand All @@ -48,15 +38,5 @@ func migrate(ctx context.Context, tracer trace.Tracer, db *bun.DB, name string,
}
return err
}
version++

if version >= MinimalSchemaVersion {
select {
case <-minimalVersionReached:
// already closed
default:
close(minimalVersionReached)
}
}
}
}
8 changes: 4 additions & 4 deletions internal/storage/driver/buckets_generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

58 changes: 27 additions & 31 deletions internal/storage/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ func (d *Driver) CreateLedger(ctx context.Context, l *ledger.Ledger) (*ledgersto

if err := b.Migrate(
ctx,
make(chan struct{}),
migrations.WithLockRetryInterval(d.migratorLockRetryInterval),
); err != nil {
return nil, fmt.Errorf("migrating bucket: %w", err)
Expand Down Expand Up @@ -159,20 +158,17 @@ func (d *Driver) GetLedger(ctx context.Context, name string) (*ledger.Ledger, er
func (d *Driver) UpgradeBucket(ctx context.Context, name string) error {
return d.bucketFactory.Create(name).Migrate(
ctx,
make(chan struct{}),
migrations.WithLockRetryInterval(d.migratorLockRetryInterval),
)
}

func (d *Driver) UpgradeAllBuckets(ctx context.Context, minimalVersionReached chan struct{}) error {
func (d *Driver) UpgradeAllBuckets(ctx context.Context) error {

buckets, err := d.systemStore.GetDistinctBuckets(ctx)
if err != nil {
return fmt.Errorf("getting distinct buckets: %w", err)
}

sem := make(chan struct{}, len(buckets))

wp := pond.New(d.parallelBucketMigrations, len(buckets), pond.Context(ctx))

for _, bucketName := range buckets {
Expand All @@ -182,18 +178,13 @@ func (d *Driver) UpgradeAllBuckets(ctx context.Context, minimalVersionReached ch
})
b := d.bucketFactory.Create(bucketName)

// copy semaphore to be able to nil it
sem := sem

l:
for {
minimalVersionReached := make(chan struct{})
errChan := make(chan error, 1)
go func() {
logger.Infof("Upgrading...")
errChan <- b.Migrate(
logging.ContextWithLogger(ctx, logger),
minimalVersionReached,
migrations.WithLockRetryInterval(d.migratorLockRetryInterval),
)
}()
Expand All @@ -213,40 +204,45 @@ func (d *Driver) UpgradeAllBuckets(ctx context.Context, minimalVersionReached ch
return
}
}
if sem != nil {
logger.Infof("Reached minimal workable version")
sem <- struct{}{}
}

logger.Info("Upgrade terminated")
return
case <-minimalVersionReached:
minimalVersionReached = nil
if sem != nil {
logger.Infof("Reached minimal workable version")
sem <- struct{}{}
sem = nil
}
}
}
}
})
}

for i := 0; i < len(buckets); i++ {
select {
case <-ctx.Done():
return ctx.Err()
case <-sem:
}
wp.StopAndWait()

return nil
}

func (d *Driver) HasReachMinimalVersion(ctx context.Context) (bool, error) {
isUpToDate, err := d.systemStore.IsUpToDate(ctx)
if err != nil {
return false, fmt.Errorf("checking if system store is up to date: %w", err)
}
if !isUpToDate {
return false, nil
}

logging.FromContext(ctx).Infof("All buckets have reached minimal workable version")
close(minimalVersionReached)
buckets, err := d.systemStore.GetDistinctBuckets(ctx)
if err != nil {
return false, fmt.Errorf("getting distinct buckets: %w", err)
}

wp.StopAndWait()
for _, b := range buckets {
hasMinimalVersion, err := d.bucketFactory.Create(b).HasMinimalVersion(ctx)
if err != nil {
return false, fmt.Errorf("checking if bucket '%s' is up to date: %w", b, err)
}
if !hasMinimalVersion {
return false, nil
}
}

return nil
return true, nil
}

func New(
Expand Down
Loading

0 comments on commit 57e71f1

Please sign in to comment.