diff --git a/frontend/cli/cmd_dev.go b/frontend/cli/cmd_dev.go index a146f52417..2aea8869df 100644 --- a/frontend/cli/cmd_dev.go +++ b/frontend/cli/cmd_dev.go @@ -106,7 +106,7 @@ func (d *devCmd) Run(ctx context.Context, k *kong.Kong, projConfig projectconfig return err } if d.languageServer != nil { - d.languageServer.Subscribe(ctx, engine.BuildUpdates) + d.languageServer.Subscribe(ctx, engine.EngineUpdates) } return engine.Dev(ctx, d.Watch) }) diff --git a/internal/buildengine/engine.go b/internal/buildengine/engine.go index d9586f0ec5..ac68f7ecf5 100644 --- a/internal/buildengine/engine.go +++ b/internal/buildengine/engine.go @@ -87,7 +87,6 @@ func (EngineEnded) buildEvent() {} // //sumtype:decl type rawEngineEvent interface { - EngineEvent rawBuildEvent() } @@ -180,11 +179,11 @@ type Engine struct { // events coming in from plugins pluginEvents chan languageplugin.PluginEvent - // internal channel for build updates - rawBuildUpdates chan rawEngineEvent + // internal channel for raw engine updates (does not include all state changes) + rawEngineUpdates chan rawEngineEvent // topic to subscribe to engine events - BuildUpdates *pubsub.Topic[EngineEvent] + EngineUpdates *pubsub.Topic[EngineEvent] } type Option func(o *Engine) @@ -235,8 +234,8 @@ func New(ctx context.Context, client DeployClient, projectRoot string, moduleDir pluginEvents: make(chan languageplugin.PluginEvent, 128), parallelism: runtime.NumCPU(), modulesToBuild: xsync.NewMapOf[string, bool](), - rawBuildUpdates: make(chan rawEngineEvent, 128), - BuildUpdates: pubsub.New[EngineEvent](), + rawEngineUpdates: make(chan rawEngineEvent, 128), + EngineUpdates: pubsub.New[EngineEvent](), } for _, option := range options { option(e) @@ -250,7 +249,7 @@ func New(ctx context.Context, client DeployClient, projectRoot string, moduleDir return nil, fmt.Errorf("failed to clean stubs: %w", err) } - updateTerminalWithEngineEvents(ctx, e.BuildUpdates) + updateTerminalWithEngineEvents(ctx, e.EngineUpdates) go e.watchForAutoRebuilds(ctx) go e.watchForEventsToPublish(ctx) @@ -273,7 +272,7 @@ func New(ctx context.Context, client DeployClient, projectRoot string, moduleDir } e.moduleMetas.Store(config.Module, meta) e.modulesToBuild.Store(config.Module, true) - e.rawBuildUpdates <- ModuleAdded{Module: config.Module} + e.rawEngineUpdates <- ModuleAdded{Module: config.Module} return nil }) } @@ -433,13 +432,13 @@ func (e *Engine) Deploy(ctx context.Context, replicas int32, waitForDeployOnline if len(meta.module.Deploy) == 0 { return fmt.Errorf("no files found to deploy for %q", moduleName) } - e.rawBuildUpdates <- ModuleDeployStarted{Module: moduleName} + e.rawEngineUpdates <- ModuleDeployStarted{Module: moduleName} err := Deploy(ctx, meta.module, meta.module.Deploy, replicas, waitForDeployOnline, e.client) if err != nil { - e.rawBuildUpdates <- ModuleDeployFailed{Module: moduleName, Error: err} + e.rawEngineUpdates <- ModuleDeployFailed{Module: moduleName, Error: err} return err } - e.rawBuildUpdates <- ModuleDeploySuccess{Module: moduleName} + e.rawEngineUpdates <- ModuleDeploySuccess{Module: moduleName} return nil }) } @@ -521,7 +520,7 @@ func (e *Engine) watchForModuleChanges(ctx context.Context, period time.Duration continue } e.moduleMetas.Store(config.Module, meta) - e.rawBuildUpdates <- ModuleAdded{Module: config.Module} + e.rawEngineUpdates <- ModuleAdded{Module: config.Module} _ = e.BuildAndDeploy(ctx, 1, true, config.Module) //nolint:errcheck } case watch.WatchEventModuleRemoved: @@ -537,7 +536,7 @@ func (e *Engine) watchForModuleChanges(ctx context.Context, period time.Duration } } e.moduleMetas.Delete(event.Config.Module) - e.rawBuildUpdates <- ModuleRemoved{Module: event.Config.Module} + e.rawEngineUpdates <- ModuleRemoved{Module: event.Config.Module} case watch.WatchEventModuleChanged: // ftl.toml file has changed meta, ok := e.moduleMetas.Load(event.Config.Module) @@ -632,13 +631,13 @@ func (e *Engine) watchForEventsToPublish(ctx context.Context) { publicBuildErrors := map[string]error{} maps.Copy(moduleErrors, publicBuildErrors) - e.BuildUpdates.Publish(EngineEnded{ModuleErrors: publicBuildErrors}) + e.EngineUpdates.Publish(EngineEnded{ModuleErrors: publicBuildErrors}) - case rawEvent := <-e.rawBuildUpdates: + case rawEvent := <-e.rawEngineUpdates: switch event := rawEvent.(type) { case ModuleAdded: - e.BuildUpdates.Publish(event) + e.EngineUpdates.Publish(event) case ModuleRemoved: delete(moduleErrors, event.Module) delete(explicitlyBuilding, event.Module) @@ -646,7 +645,7 @@ func (e *Engine) watchForEventsToPublish(ctx context.Context) { case ModuleBuildStarted: if isIdle { isIdle = false - e.BuildUpdates.Publish(EngineStarted{}) + e.EngineUpdates.Publish(EngineStarted{}) } if event.IsAutoRebuild { autoRebuilding[event.Config.Module] = true @@ -673,7 +672,7 @@ func (e *Engine) watchForEventsToPublish(ctx context.Context) { case ModuleDeployStarted: if isIdle { isIdle = false - e.BuildUpdates.Publish(EngineStarted{}) + e.EngineUpdates.Publish(EngineStarted{}) } deploying[event.Module] = true delete(moduleErrors, event.Module) @@ -684,7 +683,11 @@ func (e *Engine) watchForEventsToPublish(ctx context.Context) { delete(deploying, event.Module) delete(moduleErrors, event.Module) } - e.BuildUpdates.Publish(rawEvent) + engineEvent, ok := rawEvent.(EngineEvent) + if !ok { + panic(fmt.Sprintf("unexpected raw event type: %T", rawEvent)) + } + e.EngineUpdates.Publish(engineEvent) } if !isIdle && len(explicitlyBuilding) == 0 && len(autoRebuilding) == 0 && len(deploying) == 0 { endTime = time.Now() @@ -731,13 +734,13 @@ func (e *Engine) BuildAndDeploy(ctx context.Context, replicas int32, waitForDepl e.modulesToBuild.Store(module.Config.Module, false) terminal.UpdateModuleState(ctx, module.Config.Module, terminal.BuildStateDeploying) - e.rawBuildUpdates <- ModuleDeployStarted{Module: module.Config.Module} + e.rawEngineUpdates <- ModuleDeployStarted{Module: module.Config.Module} err := Deploy(buildCtx, module, module.Deploy, replicas, waitForDeployOnline, e.client) if err != nil { - e.rawBuildUpdates <- ModuleDeployFailed{Module: module.Config.Module, Error: err} + e.rawEngineUpdates <- ModuleDeployFailed{Module: module.Config.Module, Error: err} return err } - e.rawBuildUpdates <- ModuleDeploySuccess{Module: module.Config.Module} + e.rawEngineUpdates <- ModuleDeploySuccess{Module: module.Config.Module} return nil }) return nil @@ -906,12 +909,12 @@ func (e *Engine) tryBuild(ctx context.Context, mustBuild map[string]bool, module } } - e.rawBuildUpdates <- ModuleBuildStarted{Config: meta.module.Config} + e.rawEngineUpdates <- ModuleBuildStarted{Config: meta.module.Config} err := e.build(ctx, moduleName, builtModules, schemas) if err != nil { - e.rawBuildUpdates <- ModuleBuildFailed{Config: meta.module.Config, Error: err} + e.rawEngineUpdates <- ModuleBuildFailed{Config: meta.module.Config, Error: err} } else { - e.rawBuildUpdates <- ModuleBuildSuccess{Config: meta.module.Config} + e.rawEngineUpdates <- ModuleBuildSuccess{Config: meta.module.Config} } if err == nil && callback != nil { // load latest meta as it may have been updated @@ -1052,22 +1055,22 @@ func (e *Engine) watchForAutoRebuilds(originalCtx context.Context) { } switch event := event.(type) { case languageplugin.AutoRebuildStartedEvent: - e.rawBuildUpdates <- ModuleBuildStarted{Config: meta.module.Config, IsAutoRebuild: true} + e.rawEngineUpdates <- ModuleBuildStarted{Config: meta.module.Config, IsAutoRebuild: true} case languageplugin.AutoRebuildEndedEvent: _, deploy, err := handleBuildResult(ctx, meta.module.Config, event.Result) if err != nil { - e.rawBuildUpdates <- ModuleBuildFailed{Config: meta.module.Config, IsAutoRebuild: true, Error: err} + e.rawEngineUpdates <- ModuleBuildFailed{Config: meta.module.Config, IsAutoRebuild: true, Error: err} continue } - e.rawBuildUpdates <- ModuleBuildSuccess{Config: meta.module.Config, IsAutoRebuild: true} + e.rawEngineUpdates <- ModuleBuildSuccess{Config: meta.module.Config, IsAutoRebuild: true} - e.rawBuildUpdates <- ModuleDeployStarted{Module: event.Module} + e.rawEngineUpdates <- ModuleDeployStarted{Module: event.Module} if err := Deploy(ctx, meta.module, deploy, 1, true, e.client); err != nil { - e.rawBuildUpdates <- ModuleDeployFailed{Module: event.Module, Error: err} + e.rawEngineUpdates <- ModuleDeployFailed{Module: event.Module, Error: err} continue } - e.rawBuildUpdates <- ModuleDeploySuccess{Module: event.Module} + e.rawEngineUpdates <- ModuleDeploySuccess{Module: event.Module} } case <-originalCtx.Done(): return diff --git a/internal/buildengine/terminal.go b/internal/buildengine/terminal.go index c7b67f955f..7b6ff20a94 100644 --- a/internal/buildengine/terminal.go +++ b/internal/buildengine/terminal.go @@ -2,7 +2,6 @@ package buildengine import ( "context" - "fmt" "github.com/alecthomas/types/pubsub" @@ -40,9 +39,6 @@ func updateTerminalWithEngineEvents(ctx context.Context, topic *pubsub.Topic[Eng terminal.UpdateModuleState(ctx, event.Module, terminal.BuildStateDeployed) case ModuleDeployFailed: terminal.UpdateModuleState(ctx, event.Module, terminal.BuildStateFailed) - - case rawEngineEvent: - panic(fmt.Sprintf("unhandled event %T", event)) } case <-ctx.Done(): return