Skip to content

Commit

Permalink
rename and make raw event not a type of engine event
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e committed Oct 22, 2024
1 parent 2020e1e commit e61cc3e
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 36 deletions.
2 changes: 1 addition & 1 deletion frontend/cli/cmd_dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (d *devCmd) Run(ctx context.Context, k *kong.Kong, projConfig projectconfig
return err
}
if d.languageServer != nil {
d.languageServer.Subscribe(ctx, engine.BuildUpdates)
d.languageServer.Subscribe(ctx, engine.EngineUpdates)
}
return engine.Dev(ctx, d.Watch)
})
Expand Down
65 changes: 34 additions & 31 deletions internal/buildengine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ func (EngineEnded) buildEvent() {}
//
//sumtype:decl
type rawEngineEvent interface {
EngineEvent
rawBuildEvent()
}

Expand Down Expand Up @@ -180,11 +179,11 @@ type Engine struct {
// events coming in from plugins
pluginEvents chan languageplugin.PluginEvent

// internal channel for build updates
rawBuildUpdates chan rawEngineEvent
// internal channel for raw engine updates (does not include all state changes)
rawEngineUpdates chan rawEngineEvent

// topic to subscribe to engine events
BuildUpdates *pubsub.Topic[EngineEvent]
EngineUpdates *pubsub.Topic[EngineEvent]
}

type Option func(o *Engine)
Expand Down Expand Up @@ -235,8 +234,8 @@ func New(ctx context.Context, client DeployClient, projectRoot string, moduleDir
pluginEvents: make(chan languageplugin.PluginEvent, 128),
parallelism: runtime.NumCPU(),
modulesToBuild: xsync.NewMapOf[string, bool](),
rawBuildUpdates: make(chan rawEngineEvent, 128),
BuildUpdates: pubsub.New[EngineEvent](),
rawEngineUpdates: make(chan rawEngineEvent, 128),
EngineUpdates: pubsub.New[EngineEvent](),
}
for _, option := range options {
option(e)
Expand All @@ -250,7 +249,7 @@ func New(ctx context.Context, client DeployClient, projectRoot string, moduleDir
return nil, fmt.Errorf("failed to clean stubs: %w", err)
}

updateTerminalWithEngineEvents(ctx, e.BuildUpdates)
updateTerminalWithEngineEvents(ctx, e.EngineUpdates)

go e.watchForAutoRebuilds(ctx)
go e.watchForEventsToPublish(ctx)
Expand All @@ -273,7 +272,7 @@ func New(ctx context.Context, client DeployClient, projectRoot string, moduleDir
}
e.moduleMetas.Store(config.Module, meta)
e.modulesToBuild.Store(config.Module, true)
e.rawBuildUpdates <- ModuleAdded{Module: config.Module}
e.rawEngineUpdates <- ModuleAdded{Module: config.Module}
return nil
})
}
Expand Down Expand Up @@ -433,13 +432,13 @@ func (e *Engine) Deploy(ctx context.Context, replicas int32, waitForDeployOnline
if len(meta.module.Deploy) == 0 {
return fmt.Errorf("no files found to deploy for %q", moduleName)
}
e.rawBuildUpdates <- ModuleDeployStarted{Module: moduleName}
e.rawEngineUpdates <- ModuleDeployStarted{Module: moduleName}
err := Deploy(ctx, meta.module, meta.module.Deploy, replicas, waitForDeployOnline, e.client)
if err != nil {
e.rawBuildUpdates <- ModuleDeployFailed{Module: moduleName, Error: err}
e.rawEngineUpdates <- ModuleDeployFailed{Module: moduleName, Error: err}
return err
}
e.rawBuildUpdates <- ModuleDeploySuccess{Module: moduleName}
e.rawEngineUpdates <- ModuleDeploySuccess{Module: moduleName}
return nil
})
}
Expand Down Expand Up @@ -521,7 +520,7 @@ func (e *Engine) watchForModuleChanges(ctx context.Context, period time.Duration
continue
}
e.moduleMetas.Store(config.Module, meta)
e.rawBuildUpdates <- ModuleAdded{Module: config.Module}
e.rawEngineUpdates <- ModuleAdded{Module: config.Module}
_ = e.BuildAndDeploy(ctx, 1, true, config.Module) //nolint:errcheck
}
case watch.WatchEventModuleRemoved:
Expand All @@ -537,7 +536,7 @@ func (e *Engine) watchForModuleChanges(ctx context.Context, period time.Duration
}
}
e.moduleMetas.Delete(event.Config.Module)
e.rawBuildUpdates <- ModuleRemoved{Module: event.Config.Module}
e.rawEngineUpdates <- ModuleRemoved{Module: event.Config.Module}
case watch.WatchEventModuleChanged:
// ftl.toml file has changed
meta, ok := e.moduleMetas.Load(event.Config.Module)
Expand Down Expand Up @@ -632,21 +631,21 @@ func (e *Engine) watchForEventsToPublish(ctx context.Context) {

publicBuildErrors := map[string]error{}
maps.Copy(moduleErrors, publicBuildErrors)
e.BuildUpdates.Publish(EngineEnded{ModuleErrors: publicBuildErrors})
e.EngineUpdates.Publish(EngineEnded{ModuleErrors: publicBuildErrors})

case rawEvent := <-e.rawBuildUpdates:
case rawEvent := <-e.rawEngineUpdates:
switch event := rawEvent.(type) {

case ModuleAdded:
e.BuildUpdates.Publish(event)
e.EngineUpdates.Publish(event)
case ModuleRemoved:
delete(moduleErrors, event.Module)
delete(explicitlyBuilding, event.Module)
delete(autoRebuilding, event.Module)
case ModuleBuildStarted:
if isIdle {
isIdle = false
e.BuildUpdates.Publish(EngineStarted{})
e.EngineUpdates.Publish(EngineStarted{})
}
if event.IsAutoRebuild {
autoRebuilding[event.Config.Module] = true
Expand All @@ -673,7 +672,7 @@ func (e *Engine) watchForEventsToPublish(ctx context.Context) {
case ModuleDeployStarted:
if isIdle {
isIdle = false
e.BuildUpdates.Publish(EngineStarted{})
e.EngineUpdates.Publish(EngineStarted{})
}
deploying[event.Module] = true
delete(moduleErrors, event.Module)
Expand All @@ -684,7 +683,11 @@ func (e *Engine) watchForEventsToPublish(ctx context.Context) {
delete(deploying, event.Module)
delete(moduleErrors, event.Module)
}
e.BuildUpdates.Publish(rawEvent)
engineEvent, ok := rawEvent.(EngineEvent)
if !ok {
panic(fmt.Sprintf("unexpected raw event type: %T", rawEvent))
}
e.EngineUpdates.Publish(engineEvent)
}
if !isIdle && len(explicitlyBuilding) == 0 && len(autoRebuilding) == 0 && len(deploying) == 0 {
endTime = time.Now()
Expand Down Expand Up @@ -731,13 +734,13 @@ func (e *Engine) BuildAndDeploy(ctx context.Context, replicas int32, waitForDepl
e.modulesToBuild.Store(module.Config.Module, false)
terminal.UpdateModuleState(ctx, module.Config.Module, terminal.BuildStateDeploying)

e.rawBuildUpdates <- ModuleDeployStarted{Module: module.Config.Module}
e.rawEngineUpdates <- ModuleDeployStarted{Module: module.Config.Module}
err := Deploy(buildCtx, module, module.Deploy, replicas, waitForDeployOnline, e.client)
if err != nil {
e.rawBuildUpdates <- ModuleDeployFailed{Module: module.Config.Module, Error: err}
e.rawEngineUpdates <- ModuleDeployFailed{Module: module.Config.Module, Error: err}
return err
}
e.rawBuildUpdates <- ModuleDeploySuccess{Module: module.Config.Module}
e.rawEngineUpdates <- ModuleDeploySuccess{Module: module.Config.Module}
return nil
})
return nil
Expand Down Expand Up @@ -906,12 +909,12 @@ func (e *Engine) tryBuild(ctx context.Context, mustBuild map[string]bool, module
}
}

e.rawBuildUpdates <- ModuleBuildStarted{Config: meta.module.Config}
e.rawEngineUpdates <- ModuleBuildStarted{Config: meta.module.Config}
err := e.build(ctx, moduleName, builtModules, schemas)
if err != nil {
e.rawBuildUpdates <- ModuleBuildFailed{Config: meta.module.Config, Error: err}
e.rawEngineUpdates <- ModuleBuildFailed{Config: meta.module.Config, Error: err}
} else {
e.rawBuildUpdates <- ModuleBuildSuccess{Config: meta.module.Config}
e.rawEngineUpdates <- ModuleBuildSuccess{Config: meta.module.Config}
}
if err == nil && callback != nil {
// load latest meta as it may have been updated
Expand Down Expand Up @@ -1052,22 +1055,22 @@ func (e *Engine) watchForAutoRebuilds(originalCtx context.Context) {
}
switch event := event.(type) {
case languageplugin.AutoRebuildStartedEvent:
e.rawBuildUpdates <- ModuleBuildStarted{Config: meta.module.Config, IsAutoRebuild: true}
e.rawEngineUpdates <- ModuleBuildStarted{Config: meta.module.Config, IsAutoRebuild: true}

case languageplugin.AutoRebuildEndedEvent:
_, deploy, err := handleBuildResult(ctx, meta.module.Config, event.Result)
if err != nil {
e.rawBuildUpdates <- ModuleBuildFailed{Config: meta.module.Config, IsAutoRebuild: true, Error: err}
e.rawEngineUpdates <- ModuleBuildFailed{Config: meta.module.Config, IsAutoRebuild: true, Error: err}
continue
}
e.rawBuildUpdates <- ModuleBuildSuccess{Config: meta.module.Config, IsAutoRebuild: true}
e.rawEngineUpdates <- ModuleBuildSuccess{Config: meta.module.Config, IsAutoRebuild: true}

e.rawBuildUpdates <- ModuleDeployStarted{Module: event.Module}
e.rawEngineUpdates <- ModuleDeployStarted{Module: event.Module}
if err := Deploy(ctx, meta.module, deploy, 1, true, e.client); err != nil {
e.rawBuildUpdates <- ModuleDeployFailed{Module: event.Module, Error: err}
e.rawEngineUpdates <- ModuleDeployFailed{Module: event.Module, Error: err}
continue
}
e.rawBuildUpdates <- ModuleDeploySuccess{Module: event.Module}
e.rawEngineUpdates <- ModuleDeploySuccess{Module: event.Module}
}
case <-originalCtx.Done():
return
Expand Down
4 changes: 0 additions & 4 deletions internal/buildengine/terminal.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package buildengine

import (
"context"
"fmt"

"github.com/alecthomas/types/pubsub"

Expand Down Expand Up @@ -40,9 +39,6 @@ func updateTerminalWithEngineEvents(ctx context.Context, topic *pubsub.Topic[Eng
terminal.UpdateModuleState(ctx, event.Module, terminal.BuildStateDeployed)
case ModuleDeployFailed:
terminal.UpdateModuleState(ctx, event.Module, terminal.BuildStateFailed)

case rawEngineEvent:
panic(fmt.Sprintf("unhandled event %T", event))
}
case <-ctx.Done():
return
Expand Down

0 comments on commit e61cc3e

Please sign in to comment.