diff --git a/backend/common/rpc/context.go b/backend/common/rpc/context.go index 4e6aed12f3..4cadc708a9 100644 --- a/backend/common/rpc/context.go +++ b/backend/common/rpc/context.go @@ -121,6 +121,9 @@ func (m *metadataInterceptor) WrapStreamingHandler(req connect.StreamingHandlerF } err = errors.WithStack(req(ctx, s)) if err != nil { + if connect.CodeOf(err) == connect.CodeCanceled { + return nil + } logger.Logf(m.errorLevel, "Streaming RPC failed: %s: %s", err, s.Spec().Procedure) return err } diff --git a/backend/common/rpc/rpc.go b/backend/common/rpc/rpc.go index 16b2b14878..c239e39e51 100644 --- a/backend/common/rpc/rpc.go +++ b/backend/common/rpc/rpc.go @@ -168,7 +168,9 @@ func RetryStreamingClientStream[Req, Resp any]( errored = true delay := retry.Duration() - logger.Logf(logLevel, "Stream handler failed, retrying in %s: %s", delay, err) + if !errors.Is(err, context.Canceled) { + logger.Logf(logLevel, "Stream handler failed, retrying in %s: %s", delay, err) + } select { case <-ctx.Done(): return diff --git a/backend/controller/scaling/local_scaling.go b/backend/controller/scaling/local_scaling.go index e9a69feb4a..2a1319f627 100644 --- a/backend/controller/scaling/local_scaling.go +++ b/backend/controller/scaling/local_scaling.go @@ -105,8 +105,8 @@ func (l *LocalScaling) SetReplicas(ctx context.Context, replicas int, idleRunner go func() { logger.Infof("Starting runner: %s", config.Key) err := runner.Start(runnerCtx, config) - if err != nil { - logger.Errorf(err, "Error starting runner: %s", err) + if err != nil && !errors.Is(err, context.Canceled) { + logger.Errorf(err, "Runner failed: %s", err) } }() } diff --git a/cmd/ftl/cmd_serve.go b/cmd/ftl/cmd_serve.go index fbd8f4d4a0..88f6f36a12 100644 --- a/cmd/ftl/cmd_serve.go +++ b/cmd/ftl/cmd_serve.go @@ -15,11 +15,13 @@ import ( "github.com/TBD54566975/ftl/backend/common/exec" "github.com/TBD54566975/ftl/backend/common/log" "github.com/TBD54566975/ftl/backend/controller" + "github.com/TBD54566975/ftl/backend/controller/databasetesting" "github.com/TBD54566975/ftl/backend/controller/scaling" ) type serveCmd struct { Bind *url.URL `help:"Starting endpoint to bind to and advertise to. Each controller and runner will increment the port by 1" default:"http://localhost:8892"` + Recreate bool `help:"Recreate the database even if it already exists." default:"false"` Controllers int `short:"c" help:"Number of controllers to start." default:"1"` Runners int `short:"r" help:"Number of runners to start." default:"0"` } @@ -29,7 +31,7 @@ const ftlContainerName = "ftl-db" func (s *serveCmd) Run(ctx context.Context) error { logger := log.FromContext(ctx) - dsn, err := setupDB(ctx) + dsn, err := s.setupDB(ctx) if err != nil { return errors.WithStack(err) } @@ -82,7 +84,7 @@ func (s *serveCmd) Run(ctx context.Context) error { return nil } -func setupDB(ctx context.Context) (string, error) { +func (s *serveCmd) setupDB(ctx context.Context) (string, error) { logger := log.FromContext(ctx) logger.Infof("Checking for FTL database") @@ -93,7 +95,7 @@ func setupDB(ctx context.Context) (string, error) { return "", errors.WithStack(err) } - recreate := false + recreate := s.Recreate if len(output) == 0 { logger.Infof("Creating FTL database") @@ -132,14 +134,8 @@ func setupDB(ctx context.Context) (string, error) { } dsn := fmt.Sprintf("postgres://postgres:secret@localhost:%s/%s?sslmode=disable", strings.TrimSpace(string(port)), ftlContainerName) - dsnFlag := fmt.Sprintf("--dsn=%s", dsn) - if recreate { - logger.Infof("Initializing FTL schema") - err = exec.Command(ctx, logger.GetLevel(), ".", "ftl-initdb", "--recreate", dsnFlag).Run() - } else { - err = exec.Command(ctx, logger.GetLevel(), ".", "ftl-initdb", dsnFlag).Run() - } + _, err = databasetesting.CreateForDevel(ctx, dsn, recreate) if err != nil { return "", errors.WithStack(err) }