From d1ada7e5e73190468572a0a5964f2b852b0890c1 Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Fri, 11 Oct 2024 09:20:12 +1100 Subject: [PATCH 01/13] feat: add external plugin to build engine # Conflicts: # backend/protos/xyz/block/ftl/v1/language/mixins.go # Conflicts: # frontend/cli/main.go --- backend/controller/admin/local_client.go | 2 +- .../xyz/block/ftl/v1/language/mixins.go | 14 + frontend/cli/cmd_schema_diff.go | 2 +- internal/buildengine/build.go | 10 +- internal/buildengine/engine.go | 8 +- .../languageplugin/external_plugin.go | 420 ++++++++++++++++++ .../languageplugin/external_plugin_client.go | 158 +++++++ .../languageplugin/external_plugin_test.go | 393 ++++++++++++++++ .../buildengine/languageplugin/go_plugin.go | 6 +- .../buildengine/languageplugin/java_plugin.go | 10 +- .../languageplugin/java_plugin_test.go | 9 +- internal/buildengine/languageplugin/plugin.go | 87 ++-- .../buildengine/languageplugin/rust_plugin.go | 6 +- internal/moduleconfig/moduleconfig.go | 13 +- internal/moduleconfig/moduleconfig_test.go | 9 +- 15 files changed, 1078 insertions(+), 69 deletions(-) create mode 100644 internal/buildengine/languageplugin/external_plugin.go create mode 100644 internal/buildengine/languageplugin/external_plugin_client.go create mode 100644 internal/buildengine/languageplugin/external_plugin_test.go diff --git a/backend/controller/admin/local_client.go b/backend/controller/admin/local_client.go index 5334087aae..c89f3e214d 100644 --- a/backend/controller/admin/local_client.go +++ b/backend/controller/admin/local_client.go @@ -62,7 +62,7 @@ func (s *diskSchemaRetriever) GetActiveSchema(ctx context.Context, bAllocator op if err != nil { moduleSchemas <- either.RightOf[*schema.Module](fmt.Errorf("could not load plugin for %s: %w", m.Module, err)) } - defer plugin.Kill(ctx) // nolint:errcheck + defer plugin.Kill() // nolint:errcheck customDefaults, err := plugin.ModuleConfigDefaults(ctx, m.Dir) if err != nil { diff --git a/backend/protos/xyz/block/ftl/v1/language/mixins.go b/backend/protos/xyz/block/ftl/v1/language/mixins.go index bf76ad09f8..faddd24261 100644 --- a/backend/protos/xyz/block/ftl/v1/language/mixins.go +++ b/backend/protos/xyz/block/ftl/v1/language/mixins.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/TBD54566975/ftl/internal/builderrors" + "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/slices" ) @@ -72,3 +73,16 @@ func PosFromProto(pos *Position) builderrors.Position { Filename: pos.Filename, } } + +func LogLevelFromProto(level LogMessage_LogLevel) log.Level { + switch level { + case LogMessage_INFO: + return log.Info + case LogMessage_WARN: + return log.Warn + case LogMessage_ERROR: + return log.Error + default: + panic(fmt.Sprintf("unhandled Log_Level %v", level)) + } +} diff --git a/frontend/cli/cmd_schema_diff.go b/frontend/cli/cmd_schema_diff.go index 661727247f..60ea4a74c3 100644 --- a/frontend/cli/cmd_schema_diff.go +++ b/frontend/cli/cmd_schema_diff.go @@ -116,7 +116,7 @@ func localSchema(ctx context.Context, projectConfig projectconfig.Config, bindAl if err != nil { moduleSchemas <- either.RightOf[*schema.Module](err) } - defer plugin.Kill(ctx) // nolint:errcheck + defer plugin.Kill() // nolint:errcheck customDefaults, err := plugin.ModuleConfigDefaults(ctx, m.Dir) if err != nil { diff --git a/internal/buildengine/build.go b/internal/buildengine/build.go index b87134b2e9..7a629b369a 100644 --- a/internal/buildengine/build.go +++ b/internal/buildengine/build.go @@ -20,17 +20,17 @@ import ( // Build a module in the given directory given the schema and module config. // // A lock file is used to ensure that only one build is running at a time. -func build(ctx context.Context, plugin languageplugin.LanguagePlugin, projectRootDir string, sch *schema.Schema, config moduleconfig.ModuleConfig, buildEnv []string, devMode bool) (moduleSchema *schema.Module, deploy []string, err error) { - logger := log.FromContext(ctx).Module(config.Module).Scope("build") +func build(ctx context.Context, plugin languageplugin.LanguagePlugin, projectRootDir string, bctx languageplugin.BuildContext, buildEnv []string, devMode bool) (moduleSchema *schema.Module, deploy []string, err error) { + logger := log.FromContext(ctx).Module(bctx.Config.Module).Scope("build") ctx = log.ContextWithLogger(ctx, logger) logger.Infof("Building module") - result, err := plugin.Build(ctx, projectRootDir, config, sch, buildEnv, devMode) + result, err := plugin.Build(ctx, projectRootDir, bctx, buildEnv, devMode) if err != nil { - return handleBuildResult(ctx, config, either.RightOf[languageplugin.BuildResult](err)) + return handleBuildResult(ctx, bctx.Config, either.RightOf[languageplugin.BuildResult](err)) } - return handleBuildResult(ctx, config, either.LeftOf[error](result)) + return handleBuildResult(ctx, bctx.Config, either.LeftOf[error](result)) } // handleBuildResult processes the result of a build diff --git a/internal/buildengine/engine.go b/internal/buildengine/engine.go index d196f456aa..f38b652c74 100644 --- a/internal/buildengine/engine.go +++ b/internal/buildengine/engine.go @@ -487,7 +487,7 @@ func (e *Engine) watchForModuleChanges(ctx context.Context, period time.Duration } if meta, ok := e.moduleMetas.Load(event.Config.Module); ok { meta.plugin.Updates().Unsubscribe(meta.events) - err := meta.plugin.Kill(ctx) + err := meta.plugin.Kill() if err != nil { didError = true e.reportBuildFailed(err) @@ -813,7 +813,11 @@ func (e *Engine) build(ctx context.Context, moduleName string, builtModules map[ e.listener.OnBuildStarted(meta.module) } - moduleSchema, deploy, err := build(ctx, meta.plugin, e.projectRoot, sch, meta.module.Config, e.buildEnv, e.devMode) + moduleSchema, deploy, err := build(ctx, meta.plugin, e.projectRoot, languageplugin.BuildContext{ + Config: meta.module.Config, + Schema: sch, + Dependencies: meta.module.Dependencies, + }, e.buildEnv, e.devMode) if err != nil { terminal.UpdateModuleState(ctx, moduleName, terminal.BuildStateFailed) return err diff --git a/internal/buildengine/languageplugin/external_plugin.go b/internal/buildengine/languageplugin/external_plugin.go new file mode 100644 index 0000000000..99b74a05d2 --- /dev/null +++ b/internal/buildengine/languageplugin/external_plugin.go @@ -0,0 +1,420 @@ +package languageplugin + +import ( + "context" + "fmt" + "net/url" + "time" + + "connectrpc.com/connect" + "github.com/alecthomas/kong" + "github.com/alecthomas/types/either" + "github.com/alecthomas/types/optional" + "github.com/alecthomas/types/pubsub" + "google.golang.org/protobuf/types/known/structpb" + + langpb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/language" + schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema" + "github.com/TBD54566975/ftl/internal/builderrors" + "github.com/TBD54566975/ftl/internal/log" + "github.com/TBD54566975/ftl/internal/moduleconfig" + "github.com/TBD54566975/ftl/internal/projectconfig" + "github.com/TBD54566975/ftl/internal/schema" +) + +const launchTimeout = 10 * time.Second + +//sumtype:decl +type externalPluginCommand interface { + externalPluginCmd() +} + +type externalBuildCommand struct { + BuildContext + projectRoot string + rebuildAutomatically bool + + result chan either.Either[BuildResult, error] +} + +func (externalBuildCommand) externalPluginCmd() {} + +type externalPlugin struct { + client externalPluginClient + + // cancels the run() context + cancel context.CancelFunc + + // commands to execute + commands chan externalPluginCommand + + updates *pubsub.Topic[PluginEvent] +} + +var _ LanguagePlugin = &externalPlugin{} + +func newExternalPlugin(ctx context.Context, bind *url.URL, language string) (*externalPlugin, error) { + impl, err := newExternalPluginImpl(ctx, bind, language) + if err != nil { + return nil, err + } + return newExternalPluginForTesting(ctx, impl), nil +} + +func newExternalPluginForTesting(ctx context.Context, client externalPluginClient) *externalPlugin { + plugin := &externalPlugin{ + client: client, + commands: make(chan externalPluginCommand, 64), + updates: pubsub.New[PluginEvent](), + } + + var runCtx context.Context + runCtx, plugin.cancel = context.WithCancel(ctx) + go plugin.run(runCtx) + + return plugin +} + +func (p *externalPlugin) Kill() error { + p.cancel() + return p.client.kill() +} + +func (p *externalPlugin) Updates() *pubsub.Topic[PluginEvent] { + return p.updates +} + +func (p *externalPlugin) GetCreateModuleFlags(ctx context.Context) ([]*kong.Flag, error) { + res, err := p.client.getCreateModuleFlags(ctx, connect.NewRequest(&langpb.GetCreateModuleFlagsRequest{})) + if err != nil { + return nil, err + } + flags := []*kong.Flag{} + shorts := map[rune]string{} + for _, f := range res.Msg.Flags { + flag := &kong.Flag{ + Value: &kong.Value{ + Name: f.Name, + Help: f.Help, + Tag: &kong.Tag{}, + }, + } + if f.Envar != nil && *f.Envar != "" { + flag.Value.Tag.Envs = []string{*f.Envar} + } + if f.Default != nil && *f.Default != "" { + flag.Value.HasDefault = true + flag.Value.Default = *f.Default + } + if f.Short != nil && *f.Short != "" { + if len(*f.Short) > 1 { + return nil, fmt.Errorf("invalid flag declared: short flag %q for %v must be a single character", *f.Short, f.Name) + } + short := rune((*f.Short)[0]) + if existingFullName, ok := shorts[short]; ok { + return nil, fmt.Errorf("multiple flags declared with the same short name: %v and %v", existingFullName, f.Name) + } + flag.Short = short + shorts[short] = f.Name + + } + if f.Placeholder != nil && *f.Placeholder != "" { + flag.PlaceHolder = *f.Placeholder + } + flags = append(flags, flag) + } + return flags, nil +} + +// CreateModule creates a new module in the given directory with the given name and language. +func (p *externalPlugin) CreateModule(ctx context.Context, projConfig projectconfig.Config, moduleConfig moduleconfig.ModuleConfig, flags map[string]string) error { + _, err := p.client.createModule(ctx, connect.NewRequest(&langpb.CreateModuleRequest{ + Name: moduleConfig.Module, + Path: moduleConfig.Dir, + ProjectConfig: &langpb.ProjectConfig{ + NoGit: projConfig.NoGit, + Hermit: projConfig.Hermit, + }, + })) + return err +} + +func (p *externalPlugin) ModuleConfigDefaults(ctx context.Context, dir string) (moduleconfig.CustomDefaults, error) { + resp, err := p.client.moduleConfigDefaults(ctx, connect.NewRequest(&langpb.ModuleConfigDefaultsRequest{ + Path: dir, + })) + if err != nil { + return moduleconfig.CustomDefaults{}, err + } + return moduleconfig.CustomDefaults{ + DeployDir: resp.Msg.DeployDir, + Watch: resp.Msg.Watch, + Build: optional.Ptr(resp.Msg.Build), + GeneratedSchemaDir: optional.Ptr(resp.Msg.GeneratedSchemaDir), + LanguageConfig: resp.Msg.LanguageConfig.AsMap(), + }, nil +} + +func (p *externalPlugin) GetDependencies(ctx context.Context, config moduleconfig.ModuleConfig) ([]string, error) { + configProto, err := protoFromModuleConfig(config) + if err != nil { + return nil, err + } + resp, err := p.client.getDependencies(ctx, connect.NewRequest(&langpb.DependenciesRequest{ + ModuleConfig: configProto, + })) + if err != nil { + return nil, err + } + return resp.Msg.Modules, nil +} + +// Build may result in a Build or BuildContextUpdated grpc call with the plugin, depending if a build stream is already set up +func (p *externalPlugin) Build(ctx context.Context, projectRoot string, bctx BuildContext, buildEnv []string, rebuildAutomatically bool) (BuildResult, error) { + cmd := externalBuildCommand{ + BuildContext: bctx, + projectRoot: projectRoot, + rebuildAutomatically: rebuildAutomatically, + result: make(chan either.Either[BuildResult, error]), + } + p.commands <- cmd + select { + case result := <-cmd.result: + switch result := result.(type) { + case either.Left[BuildResult, error]: + return result.Get(), nil + case either.Right[BuildResult, error]: + return BuildResult{}, result.Get() //nolint:wrapcheck + default: + panic(fmt.Sprintf("unexpected result type %T", result)) + } + case <-ctx.Done(): + return BuildResult{}, fmt.Errorf("error waiting for build to complete: %w", ctx.Err()) + } +} + +func protoFromModuleConfig(c moduleconfig.ModuleConfig) (*langpb.ModuleConfig, error) { + config := c.Abs() + proto := &langpb.ModuleConfig{ + Name: config.Module, + Path: config.Dir, + DeployDir: config.DeployDir, + Watch: config.Watch, + } + if config.Build != "" { + proto.Build = &config.Build + } + if config.GeneratedSchemaDir != "" { + proto.GeneratedSchemaDir = &config.GeneratedSchemaDir + } + + langConfigProto, err := structpb.NewStruct(config.LanguageConfig) + if err != nil { + return nil, fmt.Errorf("failed to marshal language config: %w", err) + } + proto.LanguageConfig = langConfigProto + + return proto, nil +} + +func (p *externalPlugin) run(ctx context.Context) { + // State + var bctx BuildContext + var projectRoot string + + // if a current build stream is active, this is non-nil + // this does not indicate if the stream is listening to automatic rebuilds + var streamChan chan *langpb.BuildEvent + var streamCancel streamCancelFunc + + // if an explicit build command is active, this is non-nil + // if this is nil, streamChan may still be open for automatic rebuilds + var activeBuildCmd optional.Option[externalBuildCommand] + + // build counter is used to generate build request ids + var contextCounter = 0 + + logger := log.FromContext(ctx) + + for { + select { + // Process incoming commands + case cmd := <-p.commands: + switch c := cmd.(type) { + case externalBuildCommand: + // update state + projectRoot = c.projectRoot + bctx = c.BuildContext + logger = log.FromContext(ctx).Scope(bctx.Config.Module) + if _, ok := activeBuildCmd.Get(); ok { + c.result <- either.RightOf[BuildResult](fmt.Errorf("build already in progress")) + continue + } + configProto, err := protoFromModuleConfig(bctx.Config) + if err != nil { + c.result <- either.RightOf[BuildResult](err) + continue + } + + activeBuildCmd = optional.Some[externalBuildCommand](c) + contextCounter++ + + if streamChan != nil { + // tell plugin about new build context so that it rebuilds in existing build stream + p.client.buildContextUpdated(ctx, connect.NewRequest(&langpb.BuildContextUpdatedRequest{ + BuildContext: &langpb.BuildContext{ + Id: contextId(bctx.Config, contextCounter), + ModuleConfig: configProto, + Schema: bctx.Schema.ToProto().(*schemapb.Schema), //nolint:forcetypeassert + Dependencies: bctx.Dependencies, + }, + })) + continue + } + + newStreamChan, newCancelFunc, err := p.client.build(ctx, connect.NewRequest(&langpb.BuildRequest{ + ProjectPath: projectRoot, + RebuildAutomatically: c.rebuildAutomatically, + BuildContext: &langpb.BuildContext{ + Id: contextId(bctx.Config, contextCounter), + ModuleConfig: configProto, + Schema: bctx.Schema.ToProto().(*schemapb.Schema), //nolint:forcetypeassert + Dependencies: bctx.Dependencies, + }, + })) + if err != nil { + // TODO: error + continue + } + streamChan = newStreamChan + streamCancel = newCancelFunc + } + + // Receive messages from the current build stream + case e := <-streamChan: + if e == nil { + streamChan = nil + continue + } + + switch event := e.Event.(type) { + case *langpb.BuildEvent_LogMessage: + logger.Logf(langpb.LogLevelFromProto(event.LogMessage.Level), "%s", event.LogMessage.Message) + case *langpb.BuildEvent_AutoRebuildStarted: + if _, ok := activeBuildCmd.Get(); ok { + logger.Debugf("ignoring automatic rebuild started during explicit build") + continue + } + p.updates.Publish(AutoRebuildStartedEvent{ + Module: bctx.Config.Module, + }) + case *langpb.BuildEvent_BuildSuccess, *langpb.BuildEvent_BuildFailure: + streamEnded := false + cmdEnded := false + result, eventContextId, isAutomaticRebuild := getBuildSuccessOrFailure(e) + if activeBuildCmd.Ok() == isAutomaticRebuild { + logger.Debugf("ignoring automatic rebuild while expecting explicit build") + continue + } else if eventContextId != contextId(bctx.Config, contextCounter) { + logger.Debugf("received build for outdated context %q; expected %q", eventContextId, contextId(bctx.Config, contextCounter)) + continue + } + streamEnded, cmdEnded = p.handleBuildResult(bctx.Config.Module, result, activeBuildCmd) + if streamEnded { + streamCancel() + streamChan = nil + } + if cmdEnded { + activeBuildCmd = optional.None[externalBuildCommand]() + } + } + + case <-ctx.Done(): + if streamCancel != nil { + streamCancel() + } + return + } + } +} + +// getBuildSuccessOrFailure takes a BuildFailure or BuildSuccess event and returns the shared fields and an either wrapped result. +// This makes it easier to have some shared logic for both event types. +func getBuildSuccessOrFailure(e *langpb.BuildEvent) (result either.Either[*langpb.BuildEvent_BuildSuccess, *langpb.BuildEvent_BuildFailure], contextId string, isAutomaticRebuild bool) { + switch e := e.Event.(type) { + case *langpb.BuildEvent_BuildSuccess: + return either.LeftOf[*langpb.BuildEvent_BuildFailure](e), e.BuildSuccess.ContextId, e.BuildSuccess.IsAutomaticRebuild + case *langpb.BuildEvent_BuildFailure: + return either.RightOf[*langpb.BuildEvent_BuildSuccess](e), e.BuildFailure.ContextId, e.BuildFailure.IsAutomaticRebuild + default: + panic(fmt.Sprintf("unexpected event type %T", e)) + } +} + +// handleBuildResult processes the result of a build and publishes the appropriate events. +func (p *externalPlugin) handleBuildResult(module string, result either.Either[*langpb.BuildEvent_BuildSuccess, *langpb.BuildEvent_BuildFailure], activeBuildCmd optional.Option[externalBuildCommand]) (streamEnded, cmdEnded bool) { + buildResult, err := buildResultFromProto(result) + if cmd, ok := activeBuildCmd.Get(); ok { + // handle explicit build + if err != nil { + cmd.result <- either.RightOf[BuildResult](err) + } else { + cmd.result <- either.LeftOf[error](buildResult) + } + cmdEnded = true + if !cmd.rebuildAutomatically { + streamEnded = true + } + return + } + // handle auto rebuild + if err != nil { + p.updates.Publish(AutoRebuildEndedEvent{ + Module: module, + Result: either.RightOf[BuildResult](err), + }) + } else { + p.updates.Publish(AutoRebuildEndedEvent{ + Module: module, + Result: either.LeftOf[error](buildResult), + }) + } + return +} + +func buildResultFromProto(result either.Either[*langpb.BuildEvent_BuildSuccess, *langpb.BuildEvent_BuildFailure]) (buildResult BuildResult, err error) { + switch result := result.(type) { + case either.Left[*langpb.BuildEvent_BuildSuccess, *langpb.BuildEvent_BuildFailure]: + buildSuccess := result.Get().BuildSuccess + var moduleSch *schema.Module + if buildSuccess.Module != nil { + sch, err := schema.ModuleFromProto(buildSuccess.Module) + if err != nil { + return BuildResult{}, fmt.Errorf("failed to parse schema: %w", err) + } + moduleSch = sch + } + + errs := langpb.ErrorsFromProto(buildSuccess.Errors) + builderrors.SortErrorsByPosition(errs) + return BuildResult{ + Errors: errs, + Schema: moduleSch, + }, nil + case either.Right[*langpb.BuildEvent_BuildSuccess, *langpb.BuildEvent_BuildFailure]: + buildFailure := result.Get().BuildFailure + + errs := langpb.ErrorsFromProto(buildFailure.Errors) + builderrors.SortErrorsByPosition(errs) + return BuildResult{ + Errors: errs, + InvalidateDependencies: buildFailure.InvalidateDependencies, + }, nil + default: + panic(fmt.Sprintf("unexpected result type %T", result)) + } +} + +func contextId(config moduleconfig.ModuleConfig, counter int) string { + return fmt.Sprintf("%v-%v", config.Module, counter) +} diff --git a/internal/buildengine/languageplugin/external_plugin_client.go b/internal/buildengine/languageplugin/external_plugin_client.go new file mode 100644 index 0000000000..124b9a0126 --- /dev/null +++ b/internal/buildengine/languageplugin/external_plugin_client.go @@ -0,0 +1,158 @@ +package languageplugin + +import ( + "context" + "fmt" + "net/url" + "syscall" + + "connectrpc.com/connect" + "github.com/jpillora/backoff" + + langpb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/language" + langconnect "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/language/languagepbconnect" + "github.com/TBD54566975/ftl/internal/exec" + "github.com/TBD54566975/ftl/internal/log" + "github.com/TBD54566975/ftl/internal/rpc" +) + +// TODO: rename all this! + +type streamCancelFunc func() + +type externalPluginClient interface { + getCreateModuleFlags(ctx context.Context, req *connect.Request[langpb.GetCreateModuleFlagsRequest]) (*connect.Response[langpb.GetCreateModuleFlagsResponse], error) + createModule(ctx context.Context, req *connect.Request[langpb.CreateModuleRequest]) (*connect.Response[langpb.CreateModuleResponse], error) + moduleConfigDefaults(ctx context.Context, req *connect.Request[langpb.ModuleConfigDefaultsRequest]) (*connect.Response[langpb.ModuleConfigDefaultsResponse], error) + getDependencies(ctx context.Context, req *connect.Request[langpb.DependenciesRequest]) (*connect.Response[langpb.DependenciesResponse], error) + + // TODO: when not watching, does plugin need a way of closing the stream / chan? + build(ctx context.Context, req *connect.Request[langpb.BuildRequest]) (chan *langpb.BuildEvent, streamCancelFunc, error) + buildContextUpdated(ctx context.Context, req *connect.Request[langpb.BuildContextUpdatedRequest]) (*connect.Response[langpb.BuildContextUpdatedResponse], error) + + kill() error +} + +var _ externalPluginClient = &externalPluginImpl{} + +type externalPluginImpl struct { + cmd *exec.Cmd + client langconnect.LanguageServiceClient +} + +func newExternalPluginImpl(ctx context.Context, bind *url.URL, language string) (*externalPluginImpl, error) { + impl := &externalPluginImpl{ + client: rpc.Dial(langconnect.NewLanguageServiceClient, bind.String(), log.Error), + } + err := impl.start(ctx, bind, language) + if err != nil { + return nil, fmt.Errorf("failed to start plugin: %w", err) + } + return impl, nil +} + +// Start launches the plugin and blocks until the plugin is ready. +func (p *externalPluginImpl) start(ctx context.Context, bind *url.URL, language string) error { + // TODO: think more about whether this is a good log level + // TODO: think more about whether cmd's path should be the current directory, or the module's + cmdName := "ftl-language-" + language + // TODO: document says that we pass in dir... but I dont think we need to anymore + p.cmd = exec.Command(ctx, log.Debug, ".", cmdName, "--bind", bind.String()) + _, err := exec.LookPath(cmdName) + if err != nil { + return fmt.Errorf("failed to find plugin for %s: %w", language, err) + } + + runCtx, cancel := context.WithCancel(ctx) + + cmdErr := make(chan error) + pingErr := make(chan error) + + // run the plugin and wait for it to finish executing + go func() { + err := p.cmd.RunBuffered(runCtx) + if err != nil { + cmdErr <- fmt.Errorf("language plugin failed: %w", err) + cancel() + } + close(cmdErr) + }() + go func() { + // wait for the plugin to be ready + if err := p.ping(runCtx); err != nil { + cancel() + pingErr <- fmt.Errorf("failed to ping plugin") + } + close(pingErr) + }() + + select { + case err := <-cmdErr: + return err + case err := <-pingErr: + if err != nil { + return nil + } + return fmt.Errorf("failed to start plugin: %w", err) + case <-ctx.Done(): + return fmt.Errorf("failed to start plugin: %w", ctx.Err()) + } +} + +func (p *externalPluginImpl) ping(ctx context.Context) error { + retry := backoff.Backoff{} + heartbeatCtx, cancel := context.WithTimeout(ctx, launchTimeout) + defer cancel() + err := rpc.Wait(heartbeatCtx, retry, p.client) + if err != nil { + return connect.NewError(connect.CodeUnavailable, fmt.Errorf("failed to connect to runner: %w", err)) + } + return nil +} + +func (p *externalPluginImpl) kill() error { + // TODO: cancel run() ctx + return p.cmd.Kill(syscall.SIGINT) +} + +func (p *externalPluginImpl) getCreateModuleFlags(ctx context.Context, req *connect.Request[langpb.GetCreateModuleFlagsRequest]) (*connect.Response[langpb.GetCreateModuleFlagsResponse], error) { + return p.client.GetCreateModuleFlags(ctx, req) +} + +func (p *externalPluginImpl) moduleConfigDefaults(ctx context.Context, req *connect.Request[langpb.ModuleConfigDefaultsRequest]) (*connect.Response[langpb.ModuleConfigDefaultsResponse], error) { + return p.client.ModuleConfigDefaults(ctx, req) +} + +func (p *externalPluginImpl) createModule(ctx context.Context, req *connect.Request[langpb.CreateModuleRequest]) (*connect.Response[langpb.CreateModuleResponse], error) { + return p.client.CreateModule(ctx, req) +} + +func (p *externalPluginImpl) getDependencies(ctx context.Context, req *connect.Request[langpb.DependenciesRequest]) (*connect.Response[langpb.DependenciesResponse], error) { + return p.client.GetDependencies(ctx, req) +} + +func (p *externalPluginImpl) build(ctx context.Context, req *connect.Request[langpb.BuildRequest]) (chan *langpb.BuildEvent, streamCancelFunc, error) { + stream, err := p.client.Build(ctx, req) + if err != nil { + return nil, nil, err + } + + streamChan := make(chan *langpb.BuildEvent, 64) + go streamToChan(stream, streamChan) + + return streamChan, func() { + stream.Close() + close(streamChan) + }, err +} + +func streamToChan(stream *connect.ServerStreamForClient[langpb.BuildEvent], ch chan *langpb.BuildEvent) { + for stream.Receive() { + ch <- stream.Msg() + } + close(ch) +} + +func (p *externalPluginImpl) buildContextUpdated(ctx context.Context, req *connect.Request[langpb.BuildContextUpdatedRequest]) (*connect.Response[langpb.BuildContextUpdatedResponse], error) { + return p.client.BuildContextUpdated(ctx, req) +} diff --git a/internal/buildengine/languageplugin/external_plugin_test.go b/internal/buildengine/languageplugin/external_plugin_test.go new file mode 100644 index 0000000000..e68a9d78ec --- /dev/null +++ b/internal/buildengine/languageplugin/external_plugin_test.go @@ -0,0 +1,393 @@ +package languageplugin + +import ( + "context" + "testing" + "time" + + "github.com/alecthomas/assert/v2" + "github.com/alecthomas/atomic" + "github.com/alecthomas/kong" + "github.com/alecthomas/types/either" + "github.com/alecthomas/types/optional" + + "connectrpc.com/connect" + langpb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/language" + "github.com/TBD54566975/ftl/internal/builderrors" + "github.com/TBD54566975/ftl/internal/log" + "github.com/TBD54566975/ftl/internal/moduleconfig" + "github.com/TBD54566975/ftl/internal/schema" +) + +type testBuildContext struct { + BuildContext + ContextId string + IsRebuild bool +} + +type mockExternalPluginClient struct { + flags []*langpb.GetCreateModuleFlagsResponse_Flag + + buildEvents chan *langpb.BuildEvent + latestBuildContext atomic.Value[testBuildContext] +} + +var _ externalPluginClient = &mockExternalPluginClient{} + +func newMockExternalPlugin() *mockExternalPluginClient { + return &mockExternalPluginClient{ + buildEvents: make(chan *langpb.BuildEvent, 64), + } +} + +func (p *mockExternalPluginClient) getCreateModuleFlags(context.Context, *connect.Request[langpb.GetCreateModuleFlagsRequest]) (*connect.Response[langpb.GetCreateModuleFlagsResponse], error) { + return connect.NewResponse(&langpb.GetCreateModuleFlagsResponse{ + Flags: p.flags, + }), nil +} + +func (p *mockExternalPluginClient) createModule(context.Context, *connect.Request[langpb.CreateModuleRequest]) (*connect.Response[langpb.CreateModuleResponse], error) { + panic("not implemented") +} + +func (p *mockExternalPluginClient) moduleConfigDefaults(ctx context.Context, req *connect.Request[langpb.ModuleConfigDefaultsRequest]) (*connect.Response[langpb.ModuleConfigDefaultsResponse], error) { + return connect.NewResponse(&langpb.ModuleConfigDefaultsResponse{ + DeployDir: "test-deploy-dir", + Watch: []string{"a", "b", "c"}, + }), nil +} + +func (p *mockExternalPluginClient) getDependencies(context.Context, *connect.Request[langpb.DependenciesRequest]) (*connect.Response[langpb.DependenciesResponse], error) { + panic("not implemented") +} + +func buildContextFromProto(proto *langpb.BuildContext) (BuildContext, error) { + sch, err := schema.FromProto(proto.Schema) + if err != nil { + return BuildContext{}, err + } + return BuildContext{ + Schema: sch, + Dependencies: proto.Dependencies, + Config: moduleconfig.ModuleConfig{ + Dir: proto.ModuleConfig.Path, + Language: "test", + Realm: "test", + Module: proto.ModuleConfig.Name, + Build: optional.Ptr(proto.ModuleConfig.Build).Default(""), + DeployDir: proto.ModuleConfig.DeployDir, + GeneratedSchemaDir: optional.Ptr(proto.ModuleConfig.GeneratedSchemaDir).Default(""), + Watch: proto.ModuleConfig.Watch, + LanguageConfig: proto.ModuleConfig.LanguageConfig.AsMap(), + }, + }, nil +} + +func (p *mockExternalPluginClient) build(ctx context.Context, req *connect.Request[langpb.BuildRequest]) (chan *langpb.BuildEvent, streamCancelFunc, error) { + bctx, err := buildContextFromProto(req.Msg.BuildContext) + if err != nil { + return nil, nil, err + } + p.latestBuildContext.Store(testBuildContext{ + BuildContext: bctx, + ContextId: req.Msg.BuildContext.Id, + IsRebuild: false, + }) + return p.buildEvents, func() {}, nil +} + +func (p *mockExternalPluginClient) buildContextUpdated(ctx context.Context, req *connect.Request[langpb.BuildContextUpdatedRequest]) (*connect.Response[langpb.BuildContextUpdatedResponse], error) { + bctx, err := buildContextFromProto(req.Msg.BuildContext) + if err != nil { + return nil, err + } + p.latestBuildContext.Store(testBuildContext{ + BuildContext: bctx, + ContextId: req.Msg.BuildContext.Id, + IsRebuild: true, + }) + return connect.NewResponse(&langpb.BuildContextUpdatedResponse{}), nil +} + +func (p *mockExternalPluginClient) kill() error { + return nil +} + +func setUp() (context.Context, *externalPlugin, *mockExternalPluginClient, BuildContext) { + ctx := log.ContextWithNewDefaultLogger(context.Background()) + + mockImpl := newMockExternalPlugin() + plugin := newExternalPluginForTesting(ctx, mockImpl) + + bctx := BuildContext{ + Config: moduleconfig.ModuleConfig{ + Module: "name", + Dir: "test/dir", + Language: "test-lang", + }, + Schema: &schema.Schema{}, + Dependencies: []string{}, + } + return ctx, plugin, mockImpl, bctx +} + +func TestCreateModuleFlags(t *testing.T) { + for _, tt := range []struct { + protoFlags []*langpb.GetCreateModuleFlagsResponse_Flag + expectedFlags []*kong.Flag + expectedError optional.Option[string] + }{ + { + protoFlags: []*langpb.GetCreateModuleFlagsResponse_Flag{ + { + Name: "full-flag", + Help: "This has all the fields set", + Envar: optional.Some("full-flag").Ptr(), + Short: optional.Some("f").Ptr(), + Placeholder: optional.Some("placeholder").Ptr(), + Default: optional.Some("defaultValue").Ptr(), + }, + { + Name: "sparse-flag", + Help: "This has only the minimum fields set", + }, + }, + expectedFlags: []*kong.Flag{ + { + Value: &kong.Value{ + Name: "full-flag", + Help: "This has all the fields set", + HasDefault: true, + Default: "defaultValue", + Tag: &kong.Tag{ + Envs: []string{ + "full-flag", + }, + }, + }, + PlaceHolder: "placeholder", + Short: 'f', + }, + { + Value: &kong.Value{ + Name: "sparse-flag", + Help: "This has only the minimum fields set", + Tag: &kong.Tag{}, + }, + }, + }, + }, + { + protoFlags: []*langpb.GetCreateModuleFlagsResponse_Flag{ + { + Name: "multi-char-short", + Help: "This has all the fields set", + Short: optional.Some("multi").Ptr(), + }, + }, + expectedError: optional.Some(`invalid flag declared: short flag "multi" for multi-char-short must be a single character`), + }, + { + protoFlags: []*langpb.GetCreateModuleFlagsResponse_Flag{ + { + Name: "dupe-short-1", + Help: "Short must be unique", + Short: optional.Some("d").Ptr(), + }, + { + Name: "dupe-short-2", + Help: "Short must be unique", + Short: optional.Some("d").Ptr(), + }, + }, + expectedError: optional.Some(`multiple flags declared with the same short name: dupe-short-1 and dupe-short-2`), + }, + } { + t.Run(tt.protoFlags[0].Name, func(t *testing.T) { + t.Parallel() + + ctx, plugin, mockImpl, _ := setUp() + mockImpl.flags = tt.protoFlags + kongFlags, err := plugin.GetCreateModuleFlags(ctx) + if expectedError, ok := tt.expectedError.Get(); ok { + assert.Contains(t, err.Error(), expectedError) + return + } + assert.NoError(t, err) + assert.Equal(t, tt.expectedFlags, kongFlags) + }) + } +} + +func TestSimultaneousBuild(t *testing.T) { + t.Parallel() + ctx, plugin, _, bctx := setUp() + _ = beginBuild(ctx, plugin, bctx, false) + r := beginBuild(ctx, plugin, bctx, false) + result, ok := (<-r).(either.Right[BuildResult, error]) + assert.True(t, ok, "expected error, got %v", result) + assert.Contains(t, result.Get().Error(), "build already in progress") +} + +func TestMismatchedBuildContextId(t *testing.T) { + t.Parallel() + ctx, plugin, mockImpl, bctx := setUp() + + // build + result := beginBuild(ctx, plugin, bctx, false) + + // send mismatched build result (ie: a different build attempt completing) + mockImpl.buildEvents <- buildEventWithBuildError("fake", false, "this is not the result you are looking for") + + // send automatic rebuild result for the same context id (should be ignored) + realId := mockImpl.latestBuildContext.Load().ContextId + mockImpl.buildEvents <- buildEventWithBuildError(realId, true, "this is not the result you are looking for") + + // send real build result + mockImpl.buildEvents <- buildEventWithBuildError(realId, false, "this is the correct result") + + // check result + checkResult(t, <-result, "this is the correct result") +} + +func TestRebuilds(t *testing.T) { + t.Parallel() + ctx, plugin, mockImpl, bctx := setUp() + + // build and activate automatic rebuilds + result := beginBuild(ctx, plugin, bctx, true) + + // send first build result + testBuildCtx := mockImpl.latestBuildContext.Load() + mockImpl.buildEvents <- buildEventWithBuildError(testBuildCtx.ContextId, false, "first build") + + // check result + checkResult(t, <-result, "first build") + + // send rebuild request with updated schema + bctx.Schema.Modules = append(bctx.Schema.Modules, &schema.Module{Name: "another"}) + sch, err := schema.ValidateSchema(bctx.Schema) + assert.NoError(t, err, "schema should be valid") + result = beginBuild(ctx, plugin, bctx, true) + + // send rebuild result + testBuildCtx = mockImpl.latestBuildContext.Load() + assert.Equal(t, testBuildCtx.Schema, sch, "schema should have been updated") + mockImpl.buildEvents <- buildEventWithBuildError(testBuildCtx.ContextId, false, "second build") + + // check rebuild result + checkResult(t, <-result, "second build") +} + +func TestAutomaticRebuilds(t *testing.T) { + t.Parallel() + ctx, plugin, mockImpl, bctx := setUp() + + updates := make(chan PluginEvent, 64) + plugin.Updates().Subscribe(updates) + + // build and activate automatic rebuilds + result := beginBuild(ctx, plugin, bctx, true) + + // plugin sends auto rebuild has started event (should be ignored) + mockImpl.buildEvents <- &langpb.BuildEvent{ + Event: &langpb.BuildEvent_AutoRebuildStarted{}, + } + // plugin sends auto rebuild event (should be ignored) + mockImpl.buildEvents <- buildEventWithBuildError("fake", true, "auto rebuild to ignore") + + // send first build result + time.Sleep(500 * time.Millisecond) + buildCtx := mockImpl.latestBuildContext.Load() + mockImpl.buildEvents <- buildEventWithBuildError(buildCtx.ContextId, false, "first build") + + // check result + checkResult(t, <-result, "first build") + + // confirm that nothing was posted to Updates() (ie: the auto-rebuilds events were ignored) + select { + case <-updates: + t.Fatalf("expected auto rebuilds events to not get published while build is in progress") + case <-time.After(2 * time.Second): + // as expected, no events published plugin + } + + // plugin sends auto rebuild events + mockImpl.buildEvents <- &langpb.BuildEvent{ + Event: &langpb.BuildEvent_AutoRebuildStarted{}, + } + mockImpl.buildEvents <- buildEventWithBuildError(buildCtx.ContextId, true, "first real auto rebuild") + // plugin sends auto rebuild events again (this time with no rebuild started event) + mockImpl.buildEvents <- buildEventWithBuildError(buildCtx.ContextId, true, "second real auto rebuild") + + // confirm that auto rebuilds events were published + events := eventsFromChannel(updates) + assert.Equal(t, len(events), 3, "expected 3 events") + assert.Equal(t, PluginEvent(AutoRebuildStartedEvent{Module: bctx.Config.Module}), events[0]) + checkAutoRebuildResult(t, events[1], "first real auto rebuild") + checkAutoRebuildResult(t, events[2], "second real auto rebuild") +} + +func eventsFromChannel(updates chan PluginEvent) []PluginEvent { + // wait a bit to let events get published + time.Sleep(200 * time.Millisecond) + + events := []PluginEvent{} + for { + select { + case e := <-updates: + events = append(events, e) + default: + // no more events available right now + return events + } + } +} + +func buildEventWithBuildError(contextId string, isAutomaticRebuild bool, msg string) *langpb.BuildEvent { + return &langpb.BuildEvent{ + Event: &langpb.BuildEvent_BuildFailure{ + BuildFailure: &langpb.BuildFailure{ + ContextId: contextId, + IsAutomaticRebuild: isAutomaticRebuild, + Errors: langpb.ErrorsToProto([]builderrors.Error{ + { + Msg: msg, + }, + }), + }, + }, + } +} + +func beginBuild(ctx context.Context, plugin *externalPlugin, bctx BuildContext, autoRebuild bool) chan either.Either[BuildResult, error] { + result := make(chan either.Either[BuildResult, error]) + go func() { + r, err := plugin.Build(ctx, "", bctx, []string{}, autoRebuild) + if err != nil { + result <- either.RightOf[BuildResult](err) + } else { + result <- either.LeftOf[error](r) + } + }() + // sleep to make sure impl has received the build context + time.Sleep(500 * time.Millisecond) + return result +} + +func checkResult(t *testing.T, r either.Either[BuildResult, error], expectedMsg string) { + t.Helper() + left, ok := r.(either.Left[BuildResult, error]) + assert.True(t, ok, "expected build result, got %v", r) + + buildResult := left.Get() + assert.Equal(t, len(buildResult.Errors), 1) + assert.Equal(t, buildResult.Errors[0].Msg, expectedMsg) +} + +func checkAutoRebuildResult(t *testing.T, e PluginEvent, expectedMsg string) { + t.Helper() + event, ok := e.(AutoRebuildEndedEvent) + assert.True(t, ok, "expected auto rebuild event, got %v", e) + checkResult(t, event.Result, expectedMsg) +} diff --git a/internal/buildengine/languageplugin/go_plugin.go b/internal/buildengine/languageplugin/go_plugin.go index 5266c5d074..59c7288494 100644 --- a/internal/buildengine/languageplugin/go_plugin.go +++ b/internal/buildengine/languageplugin/go_plugin.go @@ -23,7 +23,6 @@ import ( "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/moduleconfig" "github.com/TBD54566975/ftl/internal/projectconfig" - "github.com/TBD54566975/ftl/internal/schema" "github.com/TBD54566975/ftl/internal/watch" ) @@ -159,8 +158,9 @@ func (p *goPlugin) GetDependencies(ctx context.Context, config moduleconfig.Modu }) } -func buildGo(ctx context.Context, projectRoot string, config moduleconfig.AbsModuleConfig, sch *schema.Schema, buildEnv []string, devMode bool, transaction watch.ModifyFilesTransaction) (BuildResult, error) { - moduleSch, buildErrs, err := compile.Build(ctx, projectRoot, config.Dir, config, sch, transaction, buildEnv, devMode) +func buildGo(ctx context.Context, projectRoot string, bctx BuildContext, buildEnv []string, devMode bool, transaction watch.ModifyFilesTransaction) (BuildResult, error) { + config := bctx.Config.Abs() + moduleSch, buildErrs, err := compile.Build(ctx, projectRoot, config.Dir, config, bctx.Schema, transaction, buildEnv, devMode) if err != nil { return BuildResult{}, CompilerBuildError{err: fmt.Errorf("failed to build module %q: %w", config.Module, err)} } diff --git a/internal/buildengine/languageplugin/java_plugin.go b/internal/buildengine/languageplugin/java_plugin.go index 14de6643ee..d95ba19298 100644 --- a/internal/buildengine/languageplugin/java_plugin.go +++ b/internal/buildengine/languageplugin/java_plugin.go @@ -14,6 +14,7 @@ import ( "github.com/TBD54566975/scaffolder" "github.com/alecthomas/kong" + "github.com/alecthomas/types/optional" "github.com/beevik/etree" "github.com/go-viper/mapstructure/v2" "golang.org/x/exp/maps" @@ -66,7 +67,7 @@ func newJavaPlugin(ctx context.Context, language string) *javaPlugin { func (p *javaPlugin) ModuleConfigDefaults(ctx context.Context, dir string) (moduleconfig.CustomDefaults, error) { defaults := moduleconfig.CustomDefaults{ - GeneratedSchemaDir: "src/main/ftl-module-schema", + GeneratedSchemaDir: optional.Some("src/main/ftl-module-schema"), // Watch defaults to files related to maven and gradle Watch: []string{"pom.xml", "src/**", "build/generated", "target/generated-sources"}, } @@ -78,13 +79,13 @@ func (p *javaPlugin) ModuleConfigDefaults(ctx context.Context, dir string) (modu defaults.LanguageConfig = map[string]any{ "build-tool": JavaBuildToolMaven, } - defaults.Build = "mvn -B package" + defaults.Build = optional.Some("mvn -B package") defaults.DeployDir = "target" } else if fileExists(buildGradle) || fileExists(buildGradleKts) { defaults.LanguageConfig = map[string]any{ "build-tool": JavaBuildToolGradle, } - defaults.Build = "gradle build" + defaults.Build = optional.Some("gradle build") defaults.DeployDir = "build" } else { return moduleconfig.CustomDefaults{}, fmt.Errorf("could not find JVM build file in %s", dir) @@ -247,9 +248,10 @@ func extractKotlinFTLImports(self, dir string) ([]string, error) { return modules, nil } -func buildJava(ctx context.Context, projectRoot string, config moduleconfig.AbsModuleConfig, sch *schema.Schema, buildEnv []string, devMode bool, transaction watch.ModifyFilesTransaction) (BuildResult, error) { +func buildJava(ctx context.Context, projectRoot string, bctx BuildContext, buildEnv []string, devMode bool, transaction watch.ModifyFilesTransaction) (BuildResult, error) { // TODO: add back // Deploy: + config := bctx.Config.Abs() logger := log.FromContext(ctx) javaConfig, err := loadJavaConfig(config.LanguageConfig, config.Language) if err != nil { diff --git a/internal/buildengine/languageplugin/java_plugin_test.go b/internal/buildengine/languageplugin/java_plugin_test.go index f7bcb950e4..8cb8b7272e 100644 --- a/internal/buildengine/languageplugin/java_plugin_test.go +++ b/internal/buildengine/languageplugin/java_plugin_test.go @@ -7,6 +7,7 @@ import ( "github.com/TBD54566975/ftl/internal/moduleconfig" "github.com/alecthomas/assert/v2" + "github.com/alecthomas/types/optional" ) func TestExtractModuleDepsKotlin(t *testing.T) { @@ -32,9 +33,9 @@ func TestJavaConfigDefaults(t *testing.T) { language: "kotlin", dir: "testdata/echokotlin", expected: moduleconfig.CustomDefaults{ - Build: "mvn -B package", + Build: optional.Some("mvn -B package"), DeployDir: "target", - GeneratedSchemaDir: "src/main/ftl-module-schema", + GeneratedSchemaDir: optional.Some("src/main/ftl-module-schema"), Watch: watch, LanguageConfig: map[string]any{ "build-tool": "maven", @@ -45,9 +46,9 @@ func TestJavaConfigDefaults(t *testing.T) { language: "kotlin", dir: "testdata/externalkotlin", expected: moduleconfig.CustomDefaults{ - Build: "mvn -B package", + Build: optional.Some("mvn -B package"), DeployDir: "target", - GeneratedSchemaDir: "src/main/ftl-module-schema", + GeneratedSchemaDir: optional.Some("src/main/ftl-module-schema"), Watch: watch, LanguageConfig: map[string]any{ "build-tool": "maven", diff --git a/internal/buildengine/languageplugin/plugin.go b/internal/buildengine/languageplugin/plugin.go index a3f6e456ac..351277dacc 100644 --- a/internal/buildengine/languageplugin/plugin.go +++ b/internal/buildengine/languageplugin/plugin.go @@ -3,6 +3,7 @@ package languageplugin import ( "context" "fmt" + "net/url" "os" "path/filepath" "time" @@ -30,6 +31,9 @@ type BuildResult struct { // Files to deploy, relative to the module config's DeployDir Deploy []string + + // Docs + InvalidateDependencies bool } // PluginEvent is used to notify of updates from the plugin. @@ -57,6 +61,13 @@ type AutoRebuildEndedEvent struct { func (AutoRebuildEndedEvent) pluginEvent() {} func (e AutoRebuildEndedEvent) ModuleName() string { return e.Module } +// TODO: docs +type BuildContext struct { + Config moduleconfig.ModuleConfig + Schema *schema.Schema + Dependencies []string +} + // LanguagePlugin handles building and scaffolding modules in a specific language. type LanguagePlugin interface { // Updates topic for all update events from the plugin @@ -66,10 +77,10 @@ type LanguagePlugin interface { // GetModuleConfigDefaults provides custom defaults for the module config. // // The result may be cached by FTL, so defaulting logic should not be changing due to normal module changes. - // For example it is valid to return defaults based on which build tool is configured within the module directory, + // For example, it is valid to return defaults based on which build tool is configured within the module directory, // as that is not expected to change during normal operation. // It is not recommended to read the module's toml file to determine defaults, as when the toml file is updated, - // the defaults will not be recalculated. + // the module defaults will not be recalculated. ModuleConfigDefaults(ctx context.Context, dir string) (moduleconfig.CustomDefaults, error) // GetCreateModuleFlags returns the flags that can be used to create a module for this language. @@ -79,15 +90,16 @@ type LanguagePlugin interface { CreateModule(ctx context.Context, projConfig projectconfig.Config, moduleConfig moduleconfig.ModuleConfig, flags map[string]string) error // GetDependencies returns the dependencies of the module. - GetDependencies(ctx context.Context, config moduleconfig.ModuleConfig) ([]string, error) + GetDependencies(ctx context.Context, moduleConfig moduleconfig.ModuleConfig) ([]string, error) // Build builds the module with the latest config and schema. // In dev mode, plugin is responsible for automatically rebuilding as relevant files within the module change, // and publishing these automatic builds updates to Updates(). - Build(ctx context.Context, projectRoot string, config moduleconfig.ModuleConfig, sch *schema.Schema, buildEnv []string, devMode bool) (BuildResult, error) + // TODO: build env needed? + Build(ctx context.Context, projectRoot string, bctx BuildContext, buildEnv []string, rebuildAutomatically bool) (BuildResult, error) // Kill stops the plugin and cleans up any resources. - Kill(ctx context.Context) error + Kill() error } // PluginFromConfig creates a new language plugin from the given config. @@ -100,7 +112,12 @@ func New(ctx context.Context, bindAllocator *bind.BindAllocator, language string case "rust": return newRustPlugin(ctx), nil default: - return p, fmt.Errorf("unknown language %q", language) + // TODO: get bind url + bind, err := url.Parse("localhost:80123") + if err != nil { + return nil, fmt.Errorf("could not parse bind url: %w", err) + } + return newExternalPlugin(ctx, bind, language) } } @@ -110,11 +127,10 @@ type pluginCommand interface { } type buildCommand struct { - projectRoot string - config moduleconfig.ModuleConfig - schema *schema.Schema - buildEnv []string - devMode bool + BuildContext + projectRoot string + buildEnv []string + rebuildAutomatically bool result chan either.Either[BuildResult, error] } @@ -130,7 +146,7 @@ type getDependenciesCommand struct { func (getDependenciesCommand) pluginCmd() {} -type buildFunc = func(ctx context.Context, projectRoot string, config moduleconfig.AbsModuleConfig, sch *schema.Schema, buildEnv []string, devMode bool, transaction watch.ModifyFilesTransaction) (BuildResult, error) +type buildFunc = func(ctx context.Context, projectRoot string, bctx BuildContext, buildEnv []string, rebuildAutomatically bool, transaction watch.ModifyFilesTransaction) (BuildResult, error) type CompilerBuildError struct { err error @@ -175,19 +191,18 @@ func (p *internalPlugin) Updates() *pubsub.Topic[PluginEvent] { return p.updates } -func (p *internalPlugin) Kill(ctx context.Context) error { +func (p *internalPlugin) Kill() error { p.cancel() return nil } -func (p *internalPlugin) Build(ctx context.Context, projectRoot string, config moduleconfig.ModuleConfig, sch *schema.Schema, buildEnv []string, devMode bool) (BuildResult, error) { +func (p *internalPlugin) Build(ctx context.Context, projectRoot string, bctx BuildContext, buildEnv []string, rebuildAutomatically bool) (BuildResult, error) { cmd := buildCommand{ - projectRoot: projectRoot, - config: config, - schema: sch, - buildEnv: buildEnv, - devMode: devMode, - result: make(chan either.Either[BuildResult, error]), + BuildContext: bctx, + projectRoot: projectRoot, + buildEnv: buildEnv, + rebuildAutomatically: rebuildAutomatically, + result: make(chan either.Either[BuildResult, error]), } p.commands <- cmd select { @@ -232,11 +247,10 @@ func (p *internalPlugin) run(ctx context.Context) { // State // This is updated when given explicit build commands and used for automatic rebuilds - var config moduleconfig.ModuleConfig + var bctx BuildContext var projectRoot string - var schema *schema.Schema var buildEnv []string - devMode := false + watching := false for { select { @@ -244,19 +258,18 @@ func (p *internalPlugin) run(ctx context.Context) { switch c := cmd.(type) { case buildCommand: // update state + bctx = c.BuildContext projectRoot = c.projectRoot - config = c.config - schema = c.schema buildEnv = c.buildEnv if watcher == nil { - watcher = watch.NewWatcher(config.Watch...) + watcher = watch.NewWatcher(bctx.Config.Watch...) } // begin watching if needed - if c.devMode && !devMode { - devMode = true - topic, err := watcher.Watch(ctx, time.Second, []string{config.Abs().Dir}) + if c.rebuildAutomatically && !watching { + watching = true + topic, err := watcher.Watch(ctx, time.Second, []string{bctx.Config.Abs().Dir}) if err != nil { c.result <- either.RightOf[BuildResult](fmt.Errorf("failed to start watching: %w", err)) continue @@ -265,7 +278,7 @@ func (p *internalPlugin) run(ctx context.Context) { } // build - result, err := buildAndLoadResult(ctx, projectRoot, config, schema, buildEnv, devMode, watcher, p.buildFunc) + result, err := buildAndLoadResult(ctx, projectRoot, bctx, buildEnv, c.rebuildAutomatically, watcher, p.buildFunc) if err != nil { c.result <- either.RightOf[BuildResult](err) continue @@ -285,17 +298,17 @@ func (p *internalPlugin) run(ctx context.Context) { case watch.WatchEventModuleChanged: // automatic rebuild - p.updates.Publish(AutoRebuildStartedEvent{Module: config.Module}) - result, err := buildAndLoadResult(ctx, projectRoot, config, schema, buildEnv, devMode, watcher, p.buildFunc) + p.updates.Publish(AutoRebuildStartedEvent{Module: bctx.Config.Module}) + result, err := buildAndLoadResult(ctx, projectRoot, bctx, buildEnv, true, watcher, p.buildFunc) if err != nil { p.updates.Publish(AutoRebuildEndedEvent{ - Module: config.Module, + Module: bctx.Config.Module, Result: either.RightOf[BuildResult](err), }) continue } p.updates.Publish(AutoRebuildEndedEvent{ - Module: config.Module, + Module: bctx.Config.Module, Result: either.LeftOf[error](result), }) case watch.WatchEventModuleAdded: @@ -311,8 +324,8 @@ func (p *internalPlugin) run(ctx context.Context) { } } -func buildAndLoadResult(ctx context.Context, projectRoot string, c moduleconfig.ModuleConfig, sch *schema.Schema, buildEnv []string, devMode bool, watcher *watch.Watcher, build buildFunc) (BuildResult, error) { - config := c.Abs() +func buildAndLoadResult(ctx context.Context, projectRoot string, bctx BuildContext, buildEnv []string, devMode bool, watcher *watch.Watcher, build buildFunc) (BuildResult, error) { + config := bctx.Config.Abs() release, err := flock.Acquire(ctx, filepath.Join(config.Dir, ".ftl.lock"), BuildLockTimeout) if err != nil { return BuildResult{}, fmt.Errorf("could not acquire build lock for %v: %w", config.Module, err) @@ -329,7 +342,7 @@ func buildAndLoadResult(ctx context.Context, projectRoot string, c moduleconfig. } transaction := watcher.GetTransaction(config.Dir) - result, err := build(ctx, projectRoot, config, sch, buildEnv, devMode, transaction) + result, err := build(ctx, projectRoot, bctx, buildEnv, devMode, transaction) if err != nil { return BuildResult{}, err } diff --git a/internal/buildengine/languageplugin/rust_plugin.go b/internal/buildengine/languageplugin/rust_plugin.go index 7bf7d4aca3..1063850ec7 100644 --- a/internal/buildengine/languageplugin/rust_plugin.go +++ b/internal/buildengine/languageplugin/rust_plugin.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/alecthomas/kong" + "github.com/alecthomas/types/optional" "github.com/TBD54566975/ftl/internal/builderrors" "github.com/TBD54566975/ftl/internal/exec" @@ -30,7 +31,7 @@ func newRustPlugin(ctx context.Context) *rustPlugin { func (p *rustPlugin) ModuleConfigDefaults(ctx context.Context, dir string) (moduleconfig.CustomDefaults, error) { return moduleconfig.CustomDefaults{ - Build: "cargo build", + Build: optional.Some("cargo build"), DeployDir: "_ftl/target/debug", Watch: []string{"**/*.rs", "Cargo.toml", "Cargo.lock"}, }, nil @@ -48,7 +49,8 @@ func (p *rustPlugin) GetDependencies(ctx context.Context, config moduleconfig.Mo return nil, fmt.Errorf("not implemented") } -func buildRust(ctx context.Context, projectRoot string, config moduleconfig.AbsModuleConfig, sch *schema.Schema, buildEnv []string, devMode bool, transaction watch.ModifyFilesTransaction) (BuildResult, error) { +func buildRust(ctx context.Context, projectRoot string, bctx BuildContext, buildEnv []string, devMode bool, transaction watch.ModifyFilesTransaction) (BuildResult, error) { + config := bctx.Config.Abs() logger := log.FromContext(ctx) logger.Debugf("Using build command '%s'", config.Build) err := exec.Command(ctx, log.Debug, config.Dir+"/_ftl", "bash", "-c", config.Build).RunBuffered(ctx) diff --git a/internal/moduleconfig/moduleconfig.go b/internal/moduleconfig/moduleconfig.go index ff138f0584..745db87a51 100644 --- a/internal/moduleconfig/moduleconfig.go +++ b/internal/moduleconfig/moduleconfig.go @@ -6,6 +6,7 @@ import ( "strings" "github.com/BurntSushi/toml" + "github.com/alecthomas/types/optional" "github.com/go-viper/mapstructure/v2" "github.com/TBD54566975/ftl/internal/slices" @@ -51,10 +52,10 @@ type AbsModuleConfig ModuleConfig type UnvalidatedModuleConfig ModuleConfig type CustomDefaults struct { - Build string DeployDir string - GeneratedSchemaDir string Watch []string + Build optional.Option[string] + GeneratedSchemaDir optional.Option[string] // only the root keys in LanguageConfig are used to find missing values that can be defaulted LanguageConfig map[string]any `toml:"-"` @@ -138,14 +139,14 @@ func (c UnvalidatedModuleConfig) FillDefaultsAndValidate(customDefaults CustomDe } // Custom defaults - if c.Build == "" { - c.Build = customDefaults.Build + if defaultValue, ok := customDefaults.Build.Get(); ok && c.Build == "" { + c.Build = defaultValue } if c.DeployDir == "" { c.DeployDir = customDefaults.DeployDir } - if c.GeneratedSchemaDir == "" { - c.GeneratedSchemaDir = customDefaults.GeneratedSchemaDir + if defaultValue, ok := customDefaults.GeneratedSchemaDir.Get(); ok && c.GeneratedSchemaDir == "" { + c.GeneratedSchemaDir = defaultValue } if c.Watch == nil { c.Watch = customDefaults.Watch diff --git a/internal/moduleconfig/moduleconfig_test.go b/internal/moduleconfig/moduleconfig_test.go index e9a1b372cf..00fbfd055c 100644 --- a/internal/moduleconfig/moduleconfig_test.go +++ b/internal/moduleconfig/moduleconfig_test.go @@ -4,6 +4,7 @@ import ( "testing" "github.com/alecthomas/assert/v2" + "github.com/alecthomas/types/optional" ) func TestDefaulting(t *testing.T) { @@ -21,9 +22,9 @@ func TestDefaulting(t *testing.T) { Language: "test", }, defaults: CustomDefaults{ - Build: "build", + Build: optional.Some("build"), DeployDir: "deploydir", - GeneratedSchemaDir: "generatedschemadir", + GeneratedSchemaDir: optional.Some("generatedschemadir"), Watch: []string{"a", "b", "c"}, }, expected: ModuleConfig{ @@ -52,9 +53,9 @@ func TestDefaulting(t *testing.T) { }, }, defaults: CustomDefaults{ - Build: "build", + Build: optional.Some("build"), DeployDir: "deploydir", - GeneratedSchemaDir: "generatedschemadir", + GeneratedSchemaDir: optional.Some("generatedschemadir"), Watch: []string{"a", "b", "c"}, }, expected: ModuleConfig{ From bd2541fa5ed0977e2a785fa49cc76a360a42cb7a Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Thu, 10 Oct 2024 13:06:35 +1100 Subject: [PATCH 02/13] clean up --- .../languageplugin/external_plugin.go | 40 ++++++++++++------- .../languageplugin/external_plugin_client.go | 6 --- .../languageplugin/external_plugin_test.go | 28 ++++++------- .../buildengine/languageplugin/java_plugin.go | 2 - 4 files changed, 40 insertions(+), 36 deletions(-) diff --git a/internal/buildengine/languageplugin/external_plugin.go b/internal/buildengine/languageplugin/external_plugin.go index 99b74a05d2..78e7e060d6 100644 --- a/internal/buildengine/languageplugin/external_plugin.go +++ b/internal/buildengine/languageplugin/external_plugin.go @@ -234,6 +234,7 @@ func (p *externalPlugin) run(ctx context.Context) { // build counter is used to generate build request ids var contextCounter = 0 + // can not scope logger initially without knowing module name logger := log.FromContext(ctx) for { @@ -243,9 +244,13 @@ func (p *externalPlugin) run(ctx context.Context) { switch c := cmd.(type) { case externalBuildCommand: // update state - projectRoot = c.projectRoot + contextCounter++ bctx = c.BuildContext + projectRoot = c.projectRoot + + // module name may have changed, update logger scope logger = log.FromContext(ctx).Scope(bctx.Config.Module) + if _, ok := activeBuildCmd.Get(); ok { c.result <- either.RightOf[BuildResult](fmt.Errorf("build already in progress")) continue @@ -256,19 +261,23 @@ func (p *externalPlugin) run(ctx context.Context) { continue } - activeBuildCmd = optional.Some[externalBuildCommand](c) - contextCounter++ + schemaProto := bctx.Schema.ToProto().(*schemapb.Schema) //nolint:forcetypeassert if streamChan != nil { // tell plugin about new build context so that it rebuilds in existing build stream - p.client.buildContextUpdated(ctx, connect.NewRequest(&langpb.BuildContextUpdatedRequest{ + _, err = p.client.buildContextUpdated(ctx, connect.NewRequest(&langpb.BuildContextUpdatedRequest{ BuildContext: &langpb.BuildContext{ - Id: contextId(bctx.Config, contextCounter), + Id: contextID(bctx.Config, contextCounter), ModuleConfig: configProto, - Schema: bctx.Schema.ToProto().(*schemapb.Schema), //nolint:forcetypeassert + Schema: schemaProto, Dependencies: bctx.Dependencies, }, })) + if err != nil { + c.result <- either.RightOf[BuildResult](err) + continue + } + activeBuildCmd = optional.Some[externalBuildCommand](c) continue } @@ -276,16 +285,17 @@ func (p *externalPlugin) run(ctx context.Context) { ProjectPath: projectRoot, RebuildAutomatically: c.rebuildAutomatically, BuildContext: &langpb.BuildContext{ - Id: contextId(bctx.Config, contextCounter), + Id: contextID(bctx.Config, contextCounter), ModuleConfig: configProto, - Schema: bctx.Schema.ToProto().(*schemapb.Schema), //nolint:forcetypeassert + Schema: schemaProto, Dependencies: bctx.Dependencies, }, })) if err != nil { - // TODO: error + c.result <- either.RightOf[BuildResult](err) continue } + activeBuildCmd = optional.Some[externalBuildCommand](c) streamChan = newStreamChan streamCancel = newCancelFunc } @@ -294,6 +304,7 @@ func (p *externalPlugin) run(ctx context.Context) { case e := <-streamChan: if e == nil { streamChan = nil + streamCancel = nil continue } @@ -311,17 +322,18 @@ func (p *externalPlugin) run(ctx context.Context) { case *langpb.BuildEvent_BuildSuccess, *langpb.BuildEvent_BuildFailure: streamEnded := false cmdEnded := false - result, eventContextId, isAutomaticRebuild := getBuildSuccessOrFailure(e) + result, eventContextID, isAutomaticRebuild := getBuildSuccessOrFailure(e) if activeBuildCmd.Ok() == isAutomaticRebuild { logger.Debugf("ignoring automatic rebuild while expecting explicit build") continue - } else if eventContextId != contextId(bctx.Config, contextCounter) { - logger.Debugf("received build for outdated context %q; expected %q", eventContextId, contextId(bctx.Config, contextCounter)) + } else if eventContextID != contextID(bctx.Config, contextCounter) { + logger.Debugf("received build for outdated context %q; expected %q", eventContextID, contextID(bctx.Config, contextCounter)) continue } streamEnded, cmdEnded = p.handleBuildResult(bctx.Config.Module, result, activeBuildCmd) if streamEnded { streamCancel() + streamCancel = nil streamChan = nil } if cmdEnded { @@ -340,7 +352,7 @@ func (p *externalPlugin) run(ctx context.Context) { // getBuildSuccessOrFailure takes a BuildFailure or BuildSuccess event and returns the shared fields and an either wrapped result. // This makes it easier to have some shared logic for both event types. -func getBuildSuccessOrFailure(e *langpb.BuildEvent) (result either.Either[*langpb.BuildEvent_BuildSuccess, *langpb.BuildEvent_BuildFailure], contextId string, isAutomaticRebuild bool) { +func getBuildSuccessOrFailure(e *langpb.BuildEvent) (result either.Either[*langpb.BuildEvent_BuildSuccess, *langpb.BuildEvent_BuildFailure], contextID string, isAutomaticRebuild bool) { switch e := e.Event.(type) { case *langpb.BuildEvent_BuildSuccess: return either.LeftOf[*langpb.BuildEvent_BuildFailure](e), e.BuildSuccess.ContextId, e.BuildSuccess.IsAutomaticRebuild @@ -415,6 +427,6 @@ func buildResultFromProto(result either.Either[*langpb.BuildEvent_BuildSuccess, } } -func contextId(config moduleconfig.ModuleConfig, counter int) string { +func contextID(config moduleconfig.ModuleConfig, counter int) string { return fmt.Sprintf("%v-%v", config.Module, counter) } diff --git a/internal/buildengine/languageplugin/external_plugin_client.go b/internal/buildengine/languageplugin/external_plugin_client.go index 124b9a0126..dfdc0201be 100644 --- a/internal/buildengine/languageplugin/external_plugin_client.go +++ b/internal/buildengine/languageplugin/external_plugin_client.go @@ -16,8 +16,6 @@ import ( "github.com/TBD54566975/ftl/internal/rpc" ) -// TODO: rename all this! - type streamCancelFunc func() type externalPluginClient interface { @@ -26,7 +24,6 @@ type externalPluginClient interface { moduleConfigDefaults(ctx context.Context, req *connect.Request[langpb.ModuleConfigDefaultsRequest]) (*connect.Response[langpb.ModuleConfigDefaultsResponse], error) getDependencies(ctx context.Context, req *connect.Request[langpb.DependenciesRequest]) (*connect.Response[langpb.DependenciesResponse], error) - // TODO: when not watching, does plugin need a way of closing the stream / chan? build(ctx context.Context, req *connect.Request[langpb.BuildRequest]) (chan *langpb.BuildEvent, streamCancelFunc, error) buildContextUpdated(ctx context.Context, req *connect.Request[langpb.BuildContextUpdatedRequest]) (*connect.Response[langpb.BuildContextUpdatedResponse], error) @@ -53,10 +50,7 @@ func newExternalPluginImpl(ctx context.Context, bind *url.URL, language string) // Start launches the plugin and blocks until the plugin is ready. func (p *externalPluginImpl) start(ctx context.Context, bind *url.URL, language string) error { - // TODO: think more about whether this is a good log level - // TODO: think more about whether cmd's path should be the current directory, or the module's cmdName := "ftl-language-" + language - // TODO: document says that we pass in dir... but I dont think we need to anymore p.cmd = exec.Command(ctx, log.Debug, ".", cmdName, "--bind", bind.String()) _, err := exec.LookPath(cmdName) if err != nil { diff --git a/internal/buildengine/languageplugin/external_plugin_test.go b/internal/buildengine/languageplugin/external_plugin_test.go index e68a9d78ec..6c6b4c1ec1 100644 --- a/internal/buildengine/languageplugin/external_plugin_test.go +++ b/internal/buildengine/languageplugin/external_plugin_test.go @@ -21,7 +21,7 @@ import ( type testBuildContext struct { BuildContext - ContextId string + ContextID string IsRebuild bool } @@ -90,7 +90,7 @@ func (p *mockExternalPluginClient) build(ctx context.Context, req *connect.Reque } p.latestBuildContext.Store(testBuildContext{ BuildContext: bctx, - ContextId: req.Msg.BuildContext.Id, + ContextID: req.Msg.BuildContext.Id, IsRebuild: false, }) return p.buildEvents, func() {}, nil @@ -103,7 +103,7 @@ func (p *mockExternalPluginClient) buildContextUpdated(ctx context.Context, req } p.latestBuildContext.Store(testBuildContext{ BuildContext: bctx, - ContextId: req.Msg.BuildContext.Id, + ContextID: req.Msg.BuildContext.Id, IsRebuild: true, }) return connect.NewResponse(&langpb.BuildContextUpdatedResponse{}), nil @@ -229,7 +229,7 @@ func TestSimultaneousBuild(t *testing.T) { assert.Contains(t, result.Get().Error(), "build already in progress") } -func TestMismatchedBuildContextId(t *testing.T) { +func TestMismatchedBuildContextID(t *testing.T) { t.Parallel() ctx, plugin, mockImpl, bctx := setUp() @@ -240,11 +240,11 @@ func TestMismatchedBuildContextId(t *testing.T) { mockImpl.buildEvents <- buildEventWithBuildError("fake", false, "this is not the result you are looking for") // send automatic rebuild result for the same context id (should be ignored) - realId := mockImpl.latestBuildContext.Load().ContextId - mockImpl.buildEvents <- buildEventWithBuildError(realId, true, "this is not the result you are looking for") + realID := mockImpl.latestBuildContext.Load().ContextID + mockImpl.buildEvents <- buildEventWithBuildError(realID, true, "this is not the result you are looking for") // send real build result - mockImpl.buildEvents <- buildEventWithBuildError(realId, false, "this is the correct result") + mockImpl.buildEvents <- buildEventWithBuildError(realID, false, "this is the correct result") // check result checkResult(t, <-result, "this is the correct result") @@ -259,7 +259,7 @@ func TestRebuilds(t *testing.T) { // send first build result testBuildCtx := mockImpl.latestBuildContext.Load() - mockImpl.buildEvents <- buildEventWithBuildError(testBuildCtx.ContextId, false, "first build") + mockImpl.buildEvents <- buildEventWithBuildError(testBuildCtx.ContextID, false, "first build") // check result checkResult(t, <-result, "first build") @@ -273,7 +273,7 @@ func TestRebuilds(t *testing.T) { // send rebuild result testBuildCtx = mockImpl.latestBuildContext.Load() assert.Equal(t, testBuildCtx.Schema, sch, "schema should have been updated") - mockImpl.buildEvents <- buildEventWithBuildError(testBuildCtx.ContextId, false, "second build") + mockImpl.buildEvents <- buildEventWithBuildError(testBuildCtx.ContextID, false, "second build") // check rebuild result checkResult(t, <-result, "second build") @@ -299,7 +299,7 @@ func TestAutomaticRebuilds(t *testing.T) { // send first build result time.Sleep(500 * time.Millisecond) buildCtx := mockImpl.latestBuildContext.Load() - mockImpl.buildEvents <- buildEventWithBuildError(buildCtx.ContextId, false, "first build") + mockImpl.buildEvents <- buildEventWithBuildError(buildCtx.ContextID, false, "first build") // check result checkResult(t, <-result, "first build") @@ -316,9 +316,9 @@ func TestAutomaticRebuilds(t *testing.T) { mockImpl.buildEvents <- &langpb.BuildEvent{ Event: &langpb.BuildEvent_AutoRebuildStarted{}, } - mockImpl.buildEvents <- buildEventWithBuildError(buildCtx.ContextId, true, "first real auto rebuild") + mockImpl.buildEvents <- buildEventWithBuildError(buildCtx.ContextID, true, "first real auto rebuild") // plugin sends auto rebuild events again (this time with no rebuild started event) - mockImpl.buildEvents <- buildEventWithBuildError(buildCtx.ContextId, true, "second real auto rebuild") + mockImpl.buildEvents <- buildEventWithBuildError(buildCtx.ContextID, true, "second real auto rebuild") // confirm that auto rebuilds events were published events := eventsFromChannel(updates) @@ -344,11 +344,11 @@ func eventsFromChannel(updates chan PluginEvent) []PluginEvent { } } -func buildEventWithBuildError(contextId string, isAutomaticRebuild bool, msg string) *langpb.BuildEvent { +func buildEventWithBuildError(contextID string, isAutomaticRebuild bool, msg string) *langpb.BuildEvent { return &langpb.BuildEvent{ Event: &langpb.BuildEvent_BuildFailure{ BuildFailure: &langpb.BuildFailure{ - ContextId: contextId, + ContextId: contextID, IsAutomaticRebuild: isAutomaticRebuild, Errors: langpb.ErrorsToProto([]builderrors.Error{ { diff --git a/internal/buildengine/languageplugin/java_plugin.go b/internal/buildengine/languageplugin/java_plugin.go index d95ba19298..1d163c86dc 100644 --- a/internal/buildengine/languageplugin/java_plugin.go +++ b/internal/buildengine/languageplugin/java_plugin.go @@ -249,8 +249,6 @@ func extractKotlinFTLImports(self, dir string) ([]string, error) { } func buildJava(ctx context.Context, projectRoot string, bctx BuildContext, buildEnv []string, devMode bool, transaction watch.ModifyFilesTransaction) (BuildResult, error) { - // TODO: add back - // Deploy: config := bctx.Config.Abs() logger := log.FromContext(ctx) javaConfig, err := loadJavaConfig(config.LanguageConfig, config.Language) From 3b11d67cfe1e6fc260b268627b9fa8cf131e102a Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Thu, 10 Oct 2024 14:20:36 +1100 Subject: [PATCH 03/13] allow build stream to break --- .../languageplugin/external_plugin.go | 20 ++++- .../languageplugin/external_plugin_client.go | 14 ++-- .../languageplugin/external_plugin_test.go | 75 ++++++++++++++++--- 3 files changed, 89 insertions(+), 20 deletions(-) diff --git a/internal/buildengine/languageplugin/external_plugin.go b/internal/buildengine/languageplugin/external_plugin.go index 78e7e060d6..2de35d1632 100644 --- a/internal/buildengine/languageplugin/external_plugin.go +++ b/internal/buildengine/languageplugin/external_plugin.go @@ -224,7 +224,7 @@ func (p *externalPlugin) run(ctx context.Context) { // if a current build stream is active, this is non-nil // this does not indicate if the stream is listening to automatic rebuilds - var streamChan chan *langpb.BuildEvent + var streamChan chan either.Either[*langpb.BuildEvent, error] var streamCancel streamCancelFunc // if an explicit build command is active, this is non-nil @@ -301,13 +301,27 @@ func (p *externalPlugin) run(ctx context.Context) { } // Receive messages from the current build stream - case e := <-streamChan: - if e == nil { + case eitherResult := <-streamChan: + if eitherResult == nil { streamChan = nil streamCancel = nil continue } + var e *langpb.BuildEvent + switch r := eitherResult.(type) { + case either.Left[*langpb.BuildEvent, error]: + e = r.Get() + case either.Right[*langpb.BuildEvent, error]: + // Stream failed + if c, ok := activeBuildCmd.Get(); ok { + c.result <- either.RightOf[BuildResult](r.Get()) + activeBuildCmd = optional.None[externalBuildCommand]() + } + streamCancel = nil + streamChan = nil + continue + } switch event := e.Event.(type) { case *langpb.BuildEvent_LogMessage: logger.Logf(langpb.LogLevelFromProto(event.LogMessage.Level), "%s", event.LogMessage.Message) diff --git a/internal/buildengine/languageplugin/external_plugin_client.go b/internal/buildengine/languageplugin/external_plugin_client.go index dfdc0201be..ebdfee3e7a 100644 --- a/internal/buildengine/languageplugin/external_plugin_client.go +++ b/internal/buildengine/languageplugin/external_plugin_client.go @@ -7,6 +7,7 @@ import ( "syscall" "connectrpc.com/connect" + "github.com/alecthomas/types/either" "github.com/jpillora/backoff" langpb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/language" @@ -24,7 +25,7 @@ type externalPluginClient interface { moduleConfigDefaults(ctx context.Context, req *connect.Request[langpb.ModuleConfigDefaultsRequest]) (*connect.Response[langpb.ModuleConfigDefaultsResponse], error) getDependencies(ctx context.Context, req *connect.Request[langpb.DependenciesRequest]) (*connect.Response[langpb.DependenciesResponse], error) - build(ctx context.Context, req *connect.Request[langpb.BuildRequest]) (chan *langpb.BuildEvent, streamCancelFunc, error) + build(ctx context.Context, req *connect.Request[langpb.BuildRequest]) (chan either.Either[*langpb.BuildEvent, error], streamCancelFunc, error) buildContextUpdated(ctx context.Context, req *connect.Request[langpb.BuildContextUpdatedRequest]) (*connect.Response[langpb.BuildContextUpdatedResponse], error) kill() error @@ -125,13 +126,13 @@ func (p *externalPluginImpl) getDependencies(ctx context.Context, req *connect.R return p.client.GetDependencies(ctx, req) } -func (p *externalPluginImpl) build(ctx context.Context, req *connect.Request[langpb.BuildRequest]) (chan *langpb.BuildEvent, streamCancelFunc, error) { +func (p *externalPluginImpl) build(ctx context.Context, req *connect.Request[langpb.BuildRequest]) (chan either.Either[*langpb.BuildEvent, error], streamCancelFunc, error) { stream, err := p.client.Build(ctx, req) if err != nil { return nil, nil, err } - streamChan := make(chan *langpb.BuildEvent, 64) + streamChan := make(chan either.Either[*langpb.BuildEvent, error], 64) go streamToChan(stream, streamChan) return streamChan, func() { @@ -140,9 +141,12 @@ func (p *externalPluginImpl) build(ctx context.Context, req *connect.Request[lan }, err } -func streamToChan(stream *connect.ServerStreamForClient[langpb.BuildEvent], ch chan *langpb.BuildEvent) { +func streamToChan(stream *connect.ServerStreamForClient[langpb.BuildEvent], ch chan either.Either[*langpb.BuildEvent, error]) { for stream.Receive() { - ch <- stream.Msg() + ch <- either.LeftOf[error](stream.Msg()) + } + if err := stream.Err(); err != nil { + ch <- either.RightOf[*langpb.BuildEvent](err) } close(ch) } diff --git a/internal/buildengine/languageplugin/external_plugin_test.go b/internal/buildengine/languageplugin/external_plugin_test.go index 6c6b4c1ec1..f09d96d558 100644 --- a/internal/buildengine/languageplugin/external_plugin_test.go +++ b/internal/buildengine/languageplugin/external_plugin_test.go @@ -2,6 +2,7 @@ package languageplugin import ( "context" + "fmt" "testing" "time" @@ -28,7 +29,7 @@ type testBuildContext struct { type mockExternalPluginClient struct { flags []*langpb.GetCreateModuleFlagsResponse_Flag - buildEvents chan *langpb.BuildEvent + buildEvents chan either.Either[*langpb.BuildEvent, error] latestBuildContext atomic.Value[testBuildContext] } @@ -36,7 +37,7 @@ var _ externalPluginClient = &mockExternalPluginClient{} func newMockExternalPlugin() *mockExternalPluginClient { return &mockExternalPluginClient{ - buildEvents: make(chan *langpb.BuildEvent, 64), + buildEvents: make(chan either.Either[*langpb.BuildEvent, error], 64), } } @@ -83,7 +84,7 @@ func buildContextFromProto(proto *langpb.BuildContext) (BuildContext, error) { }, nil } -func (p *mockExternalPluginClient) build(ctx context.Context, req *connect.Request[langpb.BuildRequest]) (chan *langpb.BuildEvent, streamCancelFunc, error) { +func (p *mockExternalPluginClient) build(ctx context.Context, req *connect.Request[langpb.BuildRequest]) (chan either.Either[*langpb.BuildEvent, error], streamCancelFunc, error) { bctx, err := buildContextFromProto(req.Msg.BuildContext) if err != nil { return nil, nil, err @@ -290,14 +291,14 @@ func TestAutomaticRebuilds(t *testing.T) { result := beginBuild(ctx, plugin, bctx, true) // plugin sends auto rebuild has started event (should be ignored) - mockImpl.buildEvents <- &langpb.BuildEvent{ + mockImpl.buildEvents <- either.LeftOf[error](&langpb.BuildEvent{ Event: &langpb.BuildEvent_AutoRebuildStarted{}, - } + }) // plugin sends auto rebuild event (should be ignored) mockImpl.buildEvents <- buildEventWithBuildError("fake", true, "auto rebuild to ignore") // send first build result - time.Sleep(500 * time.Millisecond) + time.Sleep(200 * time.Millisecond) buildCtx := mockImpl.latestBuildContext.Load() mockImpl.buildEvents <- buildEventWithBuildError(buildCtx.ContextID, false, "first build") @@ -313,9 +314,9 @@ func TestAutomaticRebuilds(t *testing.T) { } // plugin sends auto rebuild events - mockImpl.buildEvents <- &langpb.BuildEvent{ + mockImpl.buildEvents <- either.LeftOf[error](&langpb.BuildEvent{ Event: &langpb.BuildEvent_AutoRebuildStarted{}, - } + }) mockImpl.buildEvents <- buildEventWithBuildError(buildCtx.ContextID, true, "first real auto rebuild") // plugin sends auto rebuild events again (this time with no rebuild started event) mockImpl.buildEvents <- buildEventWithBuildError(buildCtx.ContextID, true, "second real auto rebuild") @@ -328,6 +329,42 @@ func TestAutomaticRebuilds(t *testing.T) { checkAutoRebuildResult(t, events[2], "second real auto rebuild") } +func TestBrokenBuildStream(t *testing.T) { + t.Parallel() + ctx, plugin, mockImpl, bctx := setUp() + + updates := make(chan PluginEvent, 64) + plugin.Updates().Subscribe(updates) + + // build and activate automatic rebuilds + result := beginBuild(ctx, plugin, bctx, true) + + // break the stream + breakStream(mockImpl) + checkStreamError(t, <-result) + + // build again + result = beginBuild(ctx, plugin, bctx, true) + + // send build result + buildCtx := mockImpl.latestBuildContext.Load() + mockImpl.buildEvents <- buildEventWithBuildError(buildCtx.ContextID, false, "first build") + checkResult(t, <-result, "first build") + + // break the stream + breakStream(mockImpl) + + // build again + result = beginBuild(ctx, plugin, bctx, true) + // confirm that a Build call was made instead of a BuildContextUpdated call + assert.False(t, mockImpl.latestBuildContext.Load().IsRebuild, "after breaking the stream, FTL should send a Build call instead of a BuildContextUpdated call") + + // send build result + buildCtx = mockImpl.latestBuildContext.Load() + mockImpl.buildEvents <- buildEventWithBuildError(buildCtx.ContextID, false, "second build") + checkResult(t, <-result, "second build") +} + func eventsFromChannel(updates chan PluginEvent) []PluginEvent { // wait a bit to let events get published time.Sleep(200 * time.Millisecond) @@ -344,8 +381,8 @@ func eventsFromChannel(updates chan PluginEvent) []PluginEvent { } } -func buildEventWithBuildError(contextID string, isAutomaticRebuild bool, msg string) *langpb.BuildEvent { - return &langpb.BuildEvent{ +func buildEventWithBuildError(contextID string, isAutomaticRebuild bool, msg string) either.Either[*langpb.BuildEvent, error] { + return either.LeftOf[error](&langpb.BuildEvent{ Event: &langpb.BuildEvent_BuildFailure{ BuildFailure: &langpb.BuildFailure{ ContextId: contextID, @@ -357,7 +394,7 @@ func buildEventWithBuildError(contextID string, isAutomaticRebuild bool, msg str }), }, }, - } + }) } func beginBuild(ctx context.Context, plugin *externalPlugin, bctx BuildContext, autoRebuild bool) chan either.Either[BuildResult, error] { @@ -371,10 +408,16 @@ func beginBuild(ctx context.Context, plugin *externalPlugin, bctx BuildContext, } }() // sleep to make sure impl has received the build context - time.Sleep(500 * time.Millisecond) + time.Sleep(300 * time.Millisecond) return result } +func breakStream(client *mockExternalPluginClient) { + client.buildEvents <- either.RightOf[*langpb.BuildEvent](fmt.Errorf("fake a broken stream")) + close(client.buildEvents) + client.buildEvents = make(chan either.Either[*langpb.BuildEvent, error], 64) +} + func checkResult(t *testing.T, r either.Either[BuildResult, error], expectedMsg string) { t.Helper() left, ok := r.(either.Left[BuildResult, error]) @@ -385,6 +428,14 @@ func checkResult(t *testing.T, r either.Either[BuildResult, error], expectedMsg assert.Equal(t, buildResult.Errors[0].Msg, expectedMsg) } +func checkStreamError(t *testing.T, r either.Either[BuildResult, error]) { + t.Helper() + right, ok := r.(either.Right[BuildResult, error]) + assert.True(t, ok, "expected error result, got %v", r) + + assert.Equal(t, right.Get(), fmt.Errorf("fake a broken stream")) +} + func checkAutoRebuildResult(t *testing.T, e PluginEvent, expectedMsg string) { t.Helper() event, ok := e.(AutoRebuildEndedEvent) From a5f18d4ca5f8e0fd94a43852e20bf474cd872e4f Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Fri, 11 Oct 2024 09:20:58 +1100 Subject: [PATCH 04/13] use bind allocator --- internal/buildengine/languageplugin/plugin.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/internal/buildengine/languageplugin/plugin.go b/internal/buildengine/languageplugin/plugin.go index 351277dacc..6b77827f5d 100644 --- a/internal/buildengine/languageplugin/plugin.go +++ b/internal/buildengine/languageplugin/plugin.go @@ -3,7 +3,6 @@ package languageplugin import ( "context" "fmt" - "net/url" "os" "path/filepath" "time" @@ -112,12 +111,7 @@ func New(ctx context.Context, bindAllocator *bind.BindAllocator, language string case "rust": return newRustPlugin(ctx), nil default: - // TODO: get bind url - bind, err := url.Parse("localhost:80123") - if err != nil { - return nil, fmt.Errorf("could not parse bind url: %w", err) - } - return newExternalPlugin(ctx, bind, language) + return newExternalPlugin(ctx, bindAllocator.Next(), language) } } From bd2bfd59685857d0065c5ddb6434b0461ff8f708 Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Fri, 11 Oct 2024 09:36:30 +1100 Subject: [PATCH 05/13] comments and lint --- frontend/cli/cmd_new.go | 2 ++ frontend/cli/main.go | 6 ++-- .../languageplugin/external_plugin.go | 2 ++ .../languageplugin/external_plugin_client.go | 36 +++++++++++++++---- .../languageplugin/external_plugin_test.go | 3 +- internal/buildengine/languageplugin/plugin.go | 5 +-- 6 files changed, 42 insertions(+), 12 deletions(-) diff --git a/frontend/cli/cmd_new.go b/frontend/cli/cmd_new.go index 7ac6dfd646..18204126b5 100644 --- a/frontend/cli/cmd_new.go +++ b/frontend/cli/cmd_new.go @@ -35,6 +35,8 @@ type newCmd struct { // - help text (ftl new go --help) // - default values // - environment variable overrides +// +// Language plugins take time to launch, so we return the one we created so it can be reused in Run(). func prepareNewCmd(ctx context.Context, k *kong.Kong, args []string) (optionalPlugin optional.Option[languageplugin.LanguagePlugin], err error) { if len(args) < 2 { return optionalPlugin, nil diff --git a/frontend/cli/main.go b/frontend/cli/main.go index f042bf5e01..70328f4a38 100644 --- a/frontend/cli/main.go +++ b/frontend/cli/main.go @@ -85,14 +85,16 @@ func main() { csm := ¤tStatusManager{} app := createKongApplication(&cli, csm) - languagePlugin, err := prepareNewCmd(ctx, app, os.Args[1:]) + + // Dynamically update the kong app with language specific flags for the "ftl new" command. + languagePlugin, err := prepareNewCmd(log.ContextWithNewDefaultLogger(ctx), app, os.Args[1:]) app.FatalIfErrorf(err) kctx, err := app.Parse(os.Args[1:]) app.FatalIfErrorf(err) if plugin, ok := languagePlugin.Get(); ok { - // for "ftl new" command, we only need to create the language plugin once + // Plugins take time to launch, so we bind the "ftl new" plugin to the kong context. kctx.BindTo(plugin, (*languageplugin.LanguagePlugin)(nil)) } diff --git a/internal/buildengine/languageplugin/external_plugin.go b/internal/buildengine/languageplugin/external_plugin.go index 2de35d1632..87772b4cf7 100644 --- a/internal/buildengine/languageplugin/external_plugin.go +++ b/internal/buildengine/languageplugin/external_plugin.go @@ -132,6 +132,8 @@ func (p *externalPlugin) CreateModule(ctx context.Context, projConfig projectcon Name: moduleConfig.Module, Path: moduleConfig.Dir, ProjectConfig: &langpb.ProjectConfig{ + Path: projConfig.Path, + Name: projConfig.Name, NoGit: projConfig.NoGit, Hermit: projConfig.Hermit, }, diff --git a/internal/buildengine/languageplugin/external_plugin_client.go b/internal/buildengine/languageplugin/external_plugin_client.go index ebdfee3e7a..042da0b5d5 100644 --- a/internal/buildengine/languageplugin/external_plugin_client.go +++ b/internal/buildengine/languageplugin/external_plugin_client.go @@ -106,24 +106,42 @@ func (p *externalPluginImpl) ping(ctx context.Context) error { } func (p *externalPluginImpl) kill() error { - // TODO: cancel run() ctx - return p.cmd.Kill(syscall.SIGINT) + if err := p.cmd.Kill(syscall.SIGINT); err != nil { + return fmt.Errorf("failed to kill language plugin: %w", err) + } + return nil } func (p *externalPluginImpl) getCreateModuleFlags(ctx context.Context, req *connect.Request[langpb.GetCreateModuleFlagsRequest]) (*connect.Response[langpb.GetCreateModuleFlagsResponse], error) { - return p.client.GetCreateModuleFlags(ctx, req) + resp, err := p.client.GetCreateModuleFlags(ctx, req) + if err != nil { + return nil, fmt.Errorf("failed to get create module flags from plugin: %w", err) + } + return resp, nil } func (p *externalPluginImpl) moduleConfigDefaults(ctx context.Context, req *connect.Request[langpb.ModuleConfigDefaultsRequest]) (*connect.Response[langpb.ModuleConfigDefaultsResponse], error) { - return p.client.ModuleConfigDefaults(ctx, req) + resp, err := p.client.ModuleConfigDefaults(ctx, req) + if err != nil { + return nil, fmt.Errorf("failed to get module config defaults from plugin: %w", err) + } + return resp, nil } func (p *externalPluginImpl) createModule(ctx context.Context, req *connect.Request[langpb.CreateModuleRequest]) (*connect.Response[langpb.CreateModuleResponse], error) { - return p.client.CreateModule(ctx, req) + resp, err := p.client.CreateModule(ctx, req) + if err != nil { + return nil, fmt.Errorf("failed to create module: %w", err) + } + return resp, nil } func (p *externalPluginImpl) getDependencies(ctx context.Context, req *connect.Request[langpb.DependenciesRequest]) (*connect.Response[langpb.DependenciesResponse], error) { - return p.client.GetDependencies(ctx, req) + resp, err := p.client.GetDependencies(ctx, req) + if err != nil { + return nil, fmt.Errorf("failed to get dependencies from plugin: %w", err) + } + return resp, nil } func (p *externalPluginImpl) build(ctx context.Context, req *connect.Request[langpb.BuildRequest]) (chan either.Either[*langpb.BuildEvent, error], streamCancelFunc, error) { @@ -152,5 +170,9 @@ func streamToChan(stream *connect.ServerStreamForClient[langpb.BuildEvent], ch c } func (p *externalPluginImpl) buildContextUpdated(ctx context.Context, req *connect.Request[langpb.BuildContextUpdatedRequest]) (*connect.Response[langpb.BuildContextUpdatedResponse], error) { - return p.client.BuildContextUpdated(ctx, req) + resp, err := p.client.BuildContextUpdated(ctx, req) + if err != nil { + return nil, fmt.Errorf("failed to send updated build context to plugin: %w", err) + } + return resp, nil } diff --git a/internal/buildengine/languageplugin/external_plugin_test.go b/internal/buildengine/languageplugin/external_plugin_test.go index f09d96d558..47fc1f54fa 100644 --- a/internal/buildengine/languageplugin/external_plugin_test.go +++ b/internal/buildengine/languageplugin/external_plugin_test.go @@ -65,7 +65,7 @@ func (p *mockExternalPluginClient) getDependencies(context.Context, *connect.Req func buildContextFromProto(proto *langpb.BuildContext) (BuildContext, error) { sch, err := schema.FromProto(proto.Schema) if err != nil { - return BuildContext{}, err + return BuildContext{}, fmt.Errorf("could not load schema from build context proto: %w", err) } return BuildContext{ Schema: sch, @@ -133,6 +133,7 @@ func setUp() (context.Context, *externalPlugin, *mockExternalPluginClient, Build } func TestCreateModuleFlags(t *testing.T) { + t.Parallel() for _, tt := range []struct { protoFlags []*langpb.GetCreateModuleFlagsResponse_Flag expectedFlags []*kong.Flag diff --git a/internal/buildengine/languageplugin/plugin.go b/internal/buildengine/languageplugin/plugin.go index 6b77827f5d..b7e7fe7300 100644 --- a/internal/buildengine/languageplugin/plugin.go +++ b/internal/buildengine/languageplugin/plugin.go @@ -60,7 +60,9 @@ type AutoRebuildEndedEvent struct { func (AutoRebuildEndedEvent) pluginEvent() {} func (e AutoRebuildEndedEvent) ModuleName() string { return e.Module } -// TODO: docs +// BuildContext contains contextual information needed to build. +// +// Any change to the build context would require a new build. type BuildContext struct { Config moduleconfig.ModuleConfig Schema *schema.Schema @@ -94,7 +96,6 @@ type LanguagePlugin interface { // Build builds the module with the latest config and schema. // In dev mode, plugin is responsible for automatically rebuilding as relevant files within the module change, // and publishing these automatic builds updates to Updates(). - // TODO: build env needed? Build(ctx context.Context, projectRoot string, bctx BuildContext, buildEnv []string, rebuildAutomatically bool) (BuildResult, error) // Kill stops the plugin and cleans up any resources. From 2c1bc3536bc72cc5493e0c6ff19c07ed5e05eb48 Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Fri, 11 Oct 2024 10:11:19 +1100 Subject: [PATCH 06/13] engine responds to invalidated dependencies --- internal/buildengine/build.go | 8 ++++++++ internal/buildengine/engine.go | 10 +++++++++- internal/buildengine/languageplugin/plugin.go | 2 +- 3 files changed, 18 insertions(+), 2 deletions(-) diff --git a/internal/buildengine/build.go b/internal/buildengine/build.go index 7a629b369a..7a2ca68a31 100644 --- a/internal/buildengine/build.go +++ b/internal/buildengine/build.go @@ -17,9 +17,13 @@ import ( "github.com/TBD54566975/ftl/internal/schema" ) +var invalidateDependenciesError = errors.New("dependencies need to be updated") + // Build a module in the given directory given the schema and module config. // // A lock file is used to ensure that only one build is running at a time. +// +// Returns invalidateDependenciesError if the build failed due to a change in dependencies. func build(ctx context.Context, plugin languageplugin.LanguagePlugin, projectRootDir string, bctx languageplugin.BuildContext, buildEnv []string, devMode bool) (moduleSchema *schema.Module, deploy []string, err error) { logger := log.FromContext(ctx).Module(bctx.Config.Module).Scope("build") ctx = log.ContextWithLogger(ctx, logger) @@ -46,6 +50,10 @@ func handleBuildResult(ctx context.Context, c moduleconfig.ModuleConfig, eitherR result = eitherResult.Get() } + if result.InvalidateDependencies { + return nil, nil, invalidateDependenciesError + } + var errs []error for _, e := range result.Errors { if e.Level == builderrors.WARN { diff --git a/internal/buildengine/engine.go b/internal/buildengine/engine.go index f38b652c74..b90aa67d88 100644 --- a/internal/buildengine/engine.go +++ b/internal/buildengine/engine.go @@ -633,7 +633,6 @@ func (e *Engine) BuildAndDeploy(ctx context.Context, replicas int32, waitForDepl type buildCallback func(ctx context.Context, module Module) error func (e *Engine) buildWithCallback(ctx context.Context, callback buildCallback, moduleNames ...string) error { - if len(moduleNames) == 0 { e.moduleMetas.Range(func(name string, meta moduleMeta) bool { moduleNames = append(moduleNames, name) @@ -820,6 +819,15 @@ func (e *Engine) build(ctx context.Context, moduleName string, builtModules map[ }, e.buildEnv, e.devMode) if err != nil { terminal.UpdateModuleState(ctx, moduleName, terminal.BuildStateFailed) + if errors.Is(err, invalidateDependenciesError) { + go func() { + logger := log.FromContext(ctx) + err := e.BuildAndDeploy(ctx, 1, true, moduleName) + e.reportBuildFailed(err) + terminal.UpdateModuleState(ctx, moduleName, terminal.BuildStateFailed) + logger.Errorf(err, "Build and deploy failed for module %q", moduleName) + }() + } return err } // update files to deploy diff --git a/internal/buildengine/languageplugin/plugin.go b/internal/buildengine/languageplugin/plugin.go index b7e7fe7300..83188c3e42 100644 --- a/internal/buildengine/languageplugin/plugin.go +++ b/internal/buildengine/languageplugin/plugin.go @@ -31,7 +31,7 @@ type BuildResult struct { // Files to deploy, relative to the module config's DeployDir Deploy []string - // Docs + // Whether the module needs to recalculate its dependencies InvalidateDependencies bool } From 0758e71bd59b7c979021f3c9bd657f94b8bcd604 Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Fri, 11 Oct 2024 11:01:25 +1100 Subject: [PATCH 07/13] fix race conditions in test --- .../languageplugin/external_plugin_test.go | 65 ++++++++++++------- 1 file changed, 41 insertions(+), 24 deletions(-) diff --git a/internal/buildengine/languageplugin/external_plugin_test.go b/internal/buildengine/languageplugin/external_plugin_test.go index 47fc1f54fa..0381490047 100644 --- a/internal/buildengine/languageplugin/external_plugin_test.go +++ b/internal/buildengine/languageplugin/external_plugin_test.go @@ -3,6 +3,7 @@ package languageplugin import ( "context" "fmt" + "sync" "testing" "time" @@ -29,7 +30,10 @@ type testBuildContext struct { type mockExternalPluginClient struct { flags []*langpb.GetCreateModuleFlagsResponse_Flag - buildEvents chan either.Either[*langpb.BuildEvent, error] + // atomic.Value does not allow us to atomically publish, close and replace the chan + buildEventsLock *sync.Mutex + buildEvents chan either.Either[*langpb.BuildEvent, error] + latestBuildContext atomic.Value[testBuildContext] } @@ -37,7 +41,8 @@ var _ externalPluginClient = &mockExternalPluginClient{} func newMockExternalPlugin() *mockExternalPluginClient { return &mockExternalPluginClient{ - buildEvents: make(chan either.Either[*langpb.BuildEvent, error], 64), + buildEventsLock: &sync.Mutex{}, + buildEvents: make(chan either.Either[*langpb.BuildEvent, error], 64), } } @@ -85,6 +90,9 @@ func buildContextFromProto(proto *langpb.BuildContext) (BuildContext, error) { } func (p *mockExternalPluginClient) build(ctx context.Context, req *connect.Request[langpb.BuildRequest]) (chan either.Either[*langpb.BuildEvent, error], streamCancelFunc, error) { + p.buildEventsLock.Lock() + defer p.buildEventsLock.Unlock() + bctx, err := buildContextFromProto(req.Msg.BuildContext) if err != nil { return nil, nil, err @@ -239,14 +247,14 @@ func TestMismatchedBuildContextID(t *testing.T) { result := beginBuild(ctx, plugin, bctx, false) // send mismatched build result (ie: a different build attempt completing) - mockImpl.buildEvents <- buildEventWithBuildError("fake", false, "this is not the result you are looking for") + mockImpl.publishBuildEvent(buildEventWithBuildError("fake", false, "this is not the result you are looking for")) // send automatic rebuild result for the same context id (should be ignored) realID := mockImpl.latestBuildContext.Load().ContextID - mockImpl.buildEvents <- buildEventWithBuildError(realID, true, "this is not the result you are looking for") + mockImpl.publishBuildEvent(buildEventWithBuildError(realID, true, "this is not the result you are looking for")) // send real build result - mockImpl.buildEvents <- buildEventWithBuildError(realID, false, "this is the correct result") + mockImpl.publishBuildEvent(buildEventWithBuildError(realID, false, "this is the correct result")) // check result checkResult(t, <-result, "this is the correct result") @@ -261,7 +269,7 @@ func TestRebuilds(t *testing.T) { // send first build result testBuildCtx := mockImpl.latestBuildContext.Load() - mockImpl.buildEvents <- buildEventWithBuildError(testBuildCtx.ContextID, false, "first build") + mockImpl.publishBuildEvent(buildEventWithBuildError(testBuildCtx.ContextID, false, "first build")) // check result checkResult(t, <-result, "first build") @@ -275,7 +283,7 @@ func TestRebuilds(t *testing.T) { // send rebuild result testBuildCtx = mockImpl.latestBuildContext.Load() assert.Equal(t, testBuildCtx.Schema, sch, "schema should have been updated") - mockImpl.buildEvents <- buildEventWithBuildError(testBuildCtx.ContextID, false, "second build") + mockImpl.publishBuildEvent(buildEventWithBuildError(testBuildCtx.ContextID, false, "second build")) // check rebuild result checkResult(t, <-result, "second build") @@ -292,16 +300,16 @@ func TestAutomaticRebuilds(t *testing.T) { result := beginBuild(ctx, plugin, bctx, true) // plugin sends auto rebuild has started event (should be ignored) - mockImpl.buildEvents <- either.LeftOf[error](&langpb.BuildEvent{ + mockImpl.publishBuildEvent(&langpb.BuildEvent{ Event: &langpb.BuildEvent_AutoRebuildStarted{}, }) // plugin sends auto rebuild event (should be ignored) - mockImpl.buildEvents <- buildEventWithBuildError("fake", true, "auto rebuild to ignore") + mockImpl.publishBuildEvent(buildEventWithBuildError("fake", true, "auto rebuild to ignore")) // send first build result time.Sleep(200 * time.Millisecond) buildCtx := mockImpl.latestBuildContext.Load() - mockImpl.buildEvents <- buildEventWithBuildError(buildCtx.ContextID, false, "first build") + mockImpl.publishBuildEvent(buildEventWithBuildError(buildCtx.ContextID, false, "first build")) // check result checkResult(t, <-result, "first build") @@ -315,12 +323,12 @@ func TestAutomaticRebuilds(t *testing.T) { } // plugin sends auto rebuild events - mockImpl.buildEvents <- either.LeftOf[error](&langpb.BuildEvent{ + mockImpl.publishBuildEvent(&langpb.BuildEvent{ Event: &langpb.BuildEvent_AutoRebuildStarted{}, }) - mockImpl.buildEvents <- buildEventWithBuildError(buildCtx.ContextID, true, "first real auto rebuild") + mockImpl.publishBuildEvent(buildEventWithBuildError(buildCtx.ContextID, true, "first real auto rebuild")) // plugin sends auto rebuild events again (this time with no rebuild started event) - mockImpl.buildEvents <- buildEventWithBuildError(buildCtx.ContextID, true, "second real auto rebuild") + mockImpl.publishBuildEvent(buildEventWithBuildError(buildCtx.ContextID, true, "second real auto rebuild")) // confirm that auto rebuilds events were published events := eventsFromChannel(updates) @@ -341,7 +349,7 @@ func TestBrokenBuildStream(t *testing.T) { result := beginBuild(ctx, plugin, bctx, true) // break the stream - breakStream(mockImpl) + mockImpl.breakStream() checkStreamError(t, <-result) // build again @@ -349,11 +357,11 @@ func TestBrokenBuildStream(t *testing.T) { // send build result buildCtx := mockImpl.latestBuildContext.Load() - mockImpl.buildEvents <- buildEventWithBuildError(buildCtx.ContextID, false, "first build") + mockImpl.publishBuildEvent(buildEventWithBuildError(buildCtx.ContextID, false, "first build")) checkResult(t, <-result, "first build") // break the stream - breakStream(mockImpl) + mockImpl.breakStream() // build again result = beginBuild(ctx, plugin, bctx, true) @@ -362,7 +370,7 @@ func TestBrokenBuildStream(t *testing.T) { // send build result buildCtx = mockImpl.latestBuildContext.Load() - mockImpl.buildEvents <- buildEventWithBuildError(buildCtx.ContextID, false, "second build") + mockImpl.publishBuildEvent(buildEventWithBuildError(buildCtx.ContextID, false, "second build")) checkResult(t, <-result, "second build") } @@ -382,8 +390,8 @@ func eventsFromChannel(updates chan PluginEvent) []PluginEvent { } } -func buildEventWithBuildError(contextID string, isAutomaticRebuild bool, msg string) either.Either[*langpb.BuildEvent, error] { - return either.LeftOf[error](&langpb.BuildEvent{ +func buildEventWithBuildError(contextID string, isAutomaticRebuild bool, msg string) *langpb.BuildEvent { + return &langpb.BuildEvent{ Event: &langpb.BuildEvent_BuildFailure{ BuildFailure: &langpb.BuildFailure{ ContextId: contextID, @@ -395,7 +403,14 @@ func buildEventWithBuildError(contextID string, isAutomaticRebuild bool, msg str }), }, }, - }) + } +} + +func (p *mockExternalPluginClient) publishBuildEvent(event *langpb.BuildEvent) { + p.buildEventsLock.Lock() + defer p.buildEventsLock.Unlock() + + p.buildEvents <- either.LeftOf[error](event) } func beginBuild(ctx context.Context, plugin *externalPlugin, bctx BuildContext, autoRebuild bool) chan either.Either[BuildResult, error] { @@ -413,10 +428,12 @@ func beginBuild(ctx context.Context, plugin *externalPlugin, bctx BuildContext, return result } -func breakStream(client *mockExternalPluginClient) { - client.buildEvents <- either.RightOf[*langpb.BuildEvent](fmt.Errorf("fake a broken stream")) - close(client.buildEvents) - client.buildEvents = make(chan either.Either[*langpb.BuildEvent, error], 64) +func (p *mockExternalPluginClient) breakStream() { + p.buildEventsLock.Lock() + defer p.buildEventsLock.Unlock() + p.buildEvents <- either.RightOf[*langpb.BuildEvent](fmt.Errorf("fake a broken stream")) + close(p.buildEvents) + p.buildEvents = make(chan either.Either[*langpb.BuildEvent, error], 64) } func checkResult(t *testing.T, r either.Either[BuildResult, error], expectedMsg string) { From ae6956bccb94cdcdbd0e76209ce8b331d8d7c889 Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Fri, 11 Oct 2024 11:06:22 +1100 Subject: [PATCH 08/13] update name of errInvalidateDependencies --- internal/buildengine/build.go | 4 ++-- internal/buildengine/engine.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/buildengine/build.go b/internal/buildengine/build.go index 7a2ca68a31..22f5da164e 100644 --- a/internal/buildengine/build.go +++ b/internal/buildengine/build.go @@ -17,7 +17,7 @@ import ( "github.com/TBD54566975/ftl/internal/schema" ) -var invalidateDependenciesError = errors.New("dependencies need to be updated") +var errInvalidateDependencies = errors.New("dependencies need to be updated") // Build a module in the given directory given the schema and module config. // @@ -51,7 +51,7 @@ func handleBuildResult(ctx context.Context, c moduleconfig.ModuleConfig, eitherR } if result.InvalidateDependencies { - return nil, nil, invalidateDependenciesError + return nil, nil, errInvalidateDependencies } var errs []error diff --git a/internal/buildengine/engine.go b/internal/buildengine/engine.go index b90aa67d88..43afc67f42 100644 --- a/internal/buildengine/engine.go +++ b/internal/buildengine/engine.go @@ -819,7 +819,7 @@ func (e *Engine) build(ctx context.Context, moduleName string, builtModules map[ }, e.buildEnv, e.devMode) if err != nil { terminal.UpdateModuleState(ctx, moduleName, terminal.BuildStateFailed) - if errors.Is(err, invalidateDependenciesError) { + if errors.Is(err, errInvalidateDependencies) { go func() { logger := log.FromContext(ctx) err := e.BuildAndDeploy(ctx, 1, true, moduleName) From 565424b4480514ab2cbf73b3451c860d38fa67f4 Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Fri, 11 Oct 2024 11:34:02 +1100 Subject: [PATCH 09/13] lint again --- .../languageplugin/external_plugin.go | 110 +++++++++--------- .../languageplugin/external_plugin_client.go | 16 +-- 2 files changed, 61 insertions(+), 65 deletions(-) diff --git a/internal/buildengine/languageplugin/external_plugin.go b/internal/buildengine/languageplugin/external_plugin.go index 87772b4cf7..794d9703a4 100644 --- a/internal/buildengine/languageplugin/external_plugin.go +++ b/internal/buildengine/languageplugin/external_plugin.go @@ -24,11 +24,6 @@ import ( const launchTimeout = 10 * time.Second -//sumtype:decl -type externalPluginCommand interface { - externalPluginCmd() -} - type externalBuildCommand struct { BuildContext projectRoot string @@ -37,8 +32,6 @@ type externalBuildCommand struct { result chan either.Either[BuildResult, error] } -func (externalBuildCommand) externalPluginCmd() {} - type externalPlugin struct { client externalPluginClient @@ -46,7 +39,7 @@ type externalPlugin struct { cancel context.CancelFunc // commands to execute - commands chan externalPluginCommand + commands chan externalBuildCommand updates *pubsub.Topic[PluginEvent] } @@ -64,7 +57,7 @@ func newExternalPlugin(ctx context.Context, bind *url.URL, language string) (*ex func newExternalPluginForTesting(ctx context.Context, client externalPluginClient) *externalPlugin { plugin := &externalPlugin{ client: client, - commands: make(chan externalPluginCommand, 64), + commands: make(chan externalBuildCommand, 64), updates: pubsub.New[PluginEvent](), } @@ -77,7 +70,10 @@ func newExternalPluginForTesting(ctx context.Context, client externalPluginClien func (p *externalPlugin) Kill() error { p.cancel() - return p.client.kill() + if err := p.client.kill(); err != nil { + return fmt.Errorf("failed to kill language plugin: %w", err) + } + return nil } func (p *externalPlugin) Updates() *pubsub.Topic[PluginEvent] { @@ -87,7 +83,7 @@ func (p *externalPlugin) Updates() *pubsub.Topic[PluginEvent] { func (p *externalPlugin) GetCreateModuleFlags(ctx context.Context) ([]*kong.Flag, error) { res, err := p.client.getCreateModuleFlags(ctx, connect.NewRequest(&langpb.GetCreateModuleFlagsRequest{})) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to get create module flags from plugin: %w", err) } flags := []*kong.Flag{} shorts := map[rune]string{} @@ -138,7 +134,10 @@ func (p *externalPlugin) CreateModule(ctx context.Context, projConfig projectcon Hermit: projConfig.Hermit, }, })) - return err + if err != nil { + return fmt.Errorf("failed to create module: %w", err) + } + return nil } func (p *externalPlugin) ModuleConfigDefaults(ctx context.Context, dir string) (moduleconfig.CustomDefaults, error) { @@ -146,7 +145,7 @@ func (p *externalPlugin) ModuleConfigDefaults(ctx context.Context, dir string) ( Path: dir, })) if err != nil { - return moduleconfig.CustomDefaults{}, err + return moduleconfig.CustomDefaults{}, fmt.Errorf("failed to get module config defaults from plugin: %w", err) } return moduleconfig.CustomDefaults{ DeployDir: resp.Msg.DeployDir, @@ -166,7 +165,7 @@ func (p *externalPlugin) GetDependencies(ctx context.Context, config moduleconfi ModuleConfig: configProto, })) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to get dependencies from plugin: %w", err) } return resp.Msg.Modules, nil } @@ -242,50 +241,30 @@ func (p *externalPlugin) run(ctx context.Context) { for { select { // Process incoming commands - case cmd := <-p.commands: - switch c := cmd.(type) { - case externalBuildCommand: - // update state - contextCounter++ - bctx = c.BuildContext - projectRoot = c.projectRoot + case c := <-p.commands: + // update state + contextCounter++ + bctx = c.BuildContext + projectRoot = c.projectRoot - // module name may have changed, update logger scope - logger = log.FromContext(ctx).Scope(bctx.Config.Module) + // module name may have changed, update logger scope + logger = log.FromContext(ctx).Scope(bctx.Config.Module) - if _, ok := activeBuildCmd.Get(); ok { - c.result <- either.RightOf[BuildResult](fmt.Errorf("build already in progress")) - continue - } - configProto, err := protoFromModuleConfig(bctx.Config) - if err != nil { - c.result <- either.RightOf[BuildResult](err) - continue - } + if _, ok := activeBuildCmd.Get(); ok { + c.result <- either.RightOf[BuildResult](fmt.Errorf("build already in progress")) + continue + } + configProto, err := protoFromModuleConfig(bctx.Config) + if err != nil { + c.result <- either.RightOf[BuildResult](err) + continue + } - schemaProto := bctx.Schema.ToProto().(*schemapb.Schema) //nolint:forcetypeassert - - if streamChan != nil { - // tell plugin about new build context so that it rebuilds in existing build stream - _, err = p.client.buildContextUpdated(ctx, connect.NewRequest(&langpb.BuildContextUpdatedRequest{ - BuildContext: &langpb.BuildContext{ - Id: contextID(bctx.Config, contextCounter), - ModuleConfig: configProto, - Schema: schemaProto, - Dependencies: bctx.Dependencies, - }, - })) - if err != nil { - c.result <- either.RightOf[BuildResult](err) - continue - } - activeBuildCmd = optional.Some[externalBuildCommand](c) - continue - } + schemaProto := bctx.Schema.ToProto().(*schemapb.Schema) //nolint:forcetypeassert - newStreamChan, newCancelFunc, err := p.client.build(ctx, connect.NewRequest(&langpb.BuildRequest{ - ProjectPath: projectRoot, - RebuildAutomatically: c.rebuildAutomatically, + if streamChan != nil { + // tell plugin about new build context so that it rebuilds in existing build stream + _, err = p.client.buildContextUpdated(ctx, connect.NewRequest(&langpb.BuildContextUpdatedRequest{ BuildContext: &langpb.BuildContext{ Id: contextID(bctx.Config, contextCounter), ModuleConfig: configProto, @@ -294,13 +273,30 @@ func (p *externalPlugin) run(ctx context.Context) { }, })) if err != nil { - c.result <- either.RightOf[BuildResult](err) + c.result <- either.RightOf[BuildResult](fmt.Errorf("failed to send updated build context to plugin: %w", err)) continue } activeBuildCmd = optional.Some[externalBuildCommand](c) - streamChan = newStreamChan - streamCancel = newCancelFunc + continue + } + + newStreamChan, newCancelFunc, err := p.client.build(ctx, connect.NewRequest(&langpb.BuildRequest{ + ProjectPath: projectRoot, + RebuildAutomatically: c.rebuildAutomatically, + BuildContext: &langpb.BuildContext{ + Id: contextID(bctx.Config, contextCounter), + ModuleConfig: configProto, + Schema: schemaProto, + Dependencies: bctx.Dependencies, + }, + })) + if err != nil { + c.result <- either.RightOf[BuildResult](fmt.Errorf("failed to start build stream: %w", err)) + continue } + activeBuildCmd = optional.Some[externalBuildCommand](c) + streamChan = newStreamChan + streamCancel = newCancelFunc // Receive messages from the current build stream case eitherResult := <-streamChan: diff --git a/internal/buildengine/languageplugin/external_plugin_client.go b/internal/buildengine/languageplugin/external_plugin_client.go index 042da0b5d5..f6113f8968 100644 --- a/internal/buildengine/languageplugin/external_plugin_client.go +++ b/internal/buildengine/languageplugin/external_plugin_client.go @@ -107,7 +107,7 @@ func (p *externalPluginImpl) ping(ctx context.Context) error { func (p *externalPluginImpl) kill() error { if err := p.cmd.Kill(syscall.SIGINT); err != nil { - return fmt.Errorf("failed to kill language plugin: %w", err) + return err //nolint:wrapcheck } return nil } @@ -115,7 +115,7 @@ func (p *externalPluginImpl) kill() error { func (p *externalPluginImpl) getCreateModuleFlags(ctx context.Context, req *connect.Request[langpb.GetCreateModuleFlagsRequest]) (*connect.Response[langpb.GetCreateModuleFlagsResponse], error) { resp, err := p.client.GetCreateModuleFlags(ctx, req) if err != nil { - return nil, fmt.Errorf("failed to get create module flags from plugin: %w", err) + return nil, err //nolint:wrapcheck } return resp, nil } @@ -123,7 +123,7 @@ func (p *externalPluginImpl) getCreateModuleFlags(ctx context.Context, req *conn func (p *externalPluginImpl) moduleConfigDefaults(ctx context.Context, req *connect.Request[langpb.ModuleConfigDefaultsRequest]) (*connect.Response[langpb.ModuleConfigDefaultsResponse], error) { resp, err := p.client.ModuleConfigDefaults(ctx, req) if err != nil { - return nil, fmt.Errorf("failed to get module config defaults from plugin: %w", err) + return nil, err //nolint:wrapcheck } return resp, nil } @@ -131,7 +131,7 @@ func (p *externalPluginImpl) moduleConfigDefaults(ctx context.Context, req *conn func (p *externalPluginImpl) createModule(ctx context.Context, req *connect.Request[langpb.CreateModuleRequest]) (*connect.Response[langpb.CreateModuleResponse], error) { resp, err := p.client.CreateModule(ctx, req) if err != nil { - return nil, fmt.Errorf("failed to create module: %w", err) + return nil, err //nolint:wrapcheck } return resp, nil } @@ -139,7 +139,7 @@ func (p *externalPluginImpl) createModule(ctx context.Context, req *connect.Requ func (p *externalPluginImpl) getDependencies(ctx context.Context, req *connect.Request[langpb.DependenciesRequest]) (*connect.Response[langpb.DependenciesResponse], error) { resp, err := p.client.GetDependencies(ctx, req) if err != nil { - return nil, fmt.Errorf("failed to get dependencies from plugin: %w", err) + return nil, err //nolint:wrapcheck } return resp, nil } @@ -147,7 +147,7 @@ func (p *externalPluginImpl) getDependencies(ctx context.Context, req *connect.R func (p *externalPluginImpl) build(ctx context.Context, req *connect.Request[langpb.BuildRequest]) (chan either.Either[*langpb.BuildEvent, error], streamCancelFunc, error) { stream, err := p.client.Build(ctx, req) if err != nil { - return nil, nil, err + return nil, nil, err //nolint:wrapcheck } streamChan := make(chan either.Either[*langpb.BuildEvent, error], 64) @@ -156,7 +156,7 @@ func (p *externalPluginImpl) build(ctx context.Context, req *connect.Request[lan return streamChan, func() { stream.Close() close(streamChan) - }, err + }, nil } func streamToChan(stream *connect.ServerStreamForClient[langpb.BuildEvent], ch chan either.Either[*langpb.BuildEvent, error]) { @@ -172,7 +172,7 @@ func streamToChan(stream *connect.ServerStreamForClient[langpb.BuildEvent], ch c func (p *externalPluginImpl) buildContextUpdated(ctx context.Context, req *connect.Request[langpb.BuildContextUpdatedRequest]) (*connect.Response[langpb.BuildContextUpdatedResponse], error) { resp, err := p.client.BuildContextUpdated(ctx, req) if err != nil { - return nil, fmt.Errorf("failed to send updated build context to plugin: %w", err) + return nil, err //nolint:wrapcheck } return resp, nil } From be0852206e0af75af5afbd9b47c132828ddfd2b2 Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Fri, 11 Oct 2024 13:01:41 +1100 Subject: [PATCH 10/13] =?UTF-8?q?use=20result.Result=20instead=20of=20eith?= =?UTF-8?q?er.Either[=E2=80=A6,=20error]?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/buildengine/build.go | 20 ++--- .../languageplugin/external_plugin.go | 76 ++++++++----------- .../languageplugin/external_plugin_client.go | 14 ++-- .../languageplugin/external_plugin_test.go | 46 +++++------ internal/buildengine/languageplugin/plugin.go | 13 +--- 5 files changed, 64 insertions(+), 105 deletions(-) diff --git a/internal/buildengine/build.go b/internal/buildengine/build.go index 22f5da164e..782563ee72 100644 --- a/internal/buildengine/build.go +++ b/internal/buildengine/build.go @@ -6,7 +6,7 @@ import ( "os" "time" - "github.com/alecthomas/types/either" + "github.com/alecthomas/types/result" "google.golang.org/protobuf/proto" "github.com/TBD54566975/ftl/internal/buildengine/languageplugin" @@ -29,25 +29,17 @@ func build(ctx context.Context, plugin languageplugin.LanguagePlugin, projectRoo ctx = log.ContextWithLogger(ctx, logger) logger.Infof("Building module") - - result, err := plugin.Build(ctx, projectRootDir, bctx, buildEnv, devMode) - if err != nil { - return handleBuildResult(ctx, bctx.Config, either.RightOf[languageplugin.BuildResult](err)) - } - return handleBuildResult(ctx, bctx.Config, either.LeftOf[error](result)) + return handleBuildResult(ctx, bctx.Config, result.From(plugin.Build(ctx, projectRootDir, bctx, buildEnv, devMode))) } // handleBuildResult processes the result of a build -func handleBuildResult(ctx context.Context, c moduleconfig.ModuleConfig, eitherResult either.Either[languageplugin.BuildResult, error]) (moduleSchema *schema.Module, deploy []string, err error) { +func handleBuildResult(ctx context.Context, c moduleconfig.ModuleConfig, eitherResult result.Result[languageplugin.BuildResult]) (moduleSchema *schema.Module, deploy []string, err error) { logger := log.FromContext(ctx) config := c.Abs() - var result languageplugin.BuildResult - switch eitherResult := eitherResult.(type) { - case either.Right[languageplugin.BuildResult, error]: - return nil, nil, fmt.Errorf("failed to build module: %w", eitherResult.Get()) - case either.Left[languageplugin.BuildResult, error]: - result = eitherResult.Get() + result, err := eitherResult.Result() + if err != nil { + return nil, nil, fmt.Errorf("failed to build module: %w", err) } if result.InvalidateDependencies { diff --git a/internal/buildengine/languageplugin/external_plugin.go b/internal/buildengine/languageplugin/external_plugin.go index 794d9703a4..adaa3edfae 100644 --- a/internal/buildengine/languageplugin/external_plugin.go +++ b/internal/buildengine/languageplugin/external_plugin.go @@ -11,6 +11,7 @@ import ( "github.com/alecthomas/types/either" "github.com/alecthomas/types/optional" "github.com/alecthomas/types/pubsub" + "github.com/alecthomas/types/result" "google.golang.org/protobuf/types/known/structpb" langpb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/language" @@ -29,7 +30,7 @@ type externalBuildCommand struct { projectRoot string rebuildAutomatically bool - result chan either.Either[BuildResult, error] + result chan result.Result[BuildResult] } type externalPlugin struct { @@ -176,19 +177,16 @@ func (p *externalPlugin) Build(ctx context.Context, projectRoot string, bctx Bui BuildContext: bctx, projectRoot: projectRoot, rebuildAutomatically: rebuildAutomatically, - result: make(chan either.Either[BuildResult, error]), + result: make(chan result.Result[BuildResult]), } p.commands <- cmd select { - case result := <-cmd.result: - switch result := result.(type) { - case either.Left[BuildResult, error]: - return result.Get(), nil - case either.Right[BuildResult, error]: - return BuildResult{}, result.Get() //nolint:wrapcheck - default: - panic(fmt.Sprintf("unexpected result type %T", result)) + case r := <-cmd.result: + result, err := r.Result() + if err != nil { + return BuildResult{}, err //nolint:wrapcheck } + return result, nil case <-ctx.Done(): return BuildResult{}, fmt.Errorf("error waiting for build to complete: %w", ctx.Err()) } @@ -225,7 +223,7 @@ func (p *externalPlugin) run(ctx context.Context) { // if a current build stream is active, this is non-nil // this does not indicate if the stream is listening to automatic rebuilds - var streamChan chan either.Either[*langpb.BuildEvent, error] + var streamChan chan result.Result[*langpb.BuildEvent] var streamCancel streamCancelFunc // if an explicit build command is active, this is non-nil @@ -251,12 +249,12 @@ func (p *externalPlugin) run(ctx context.Context) { logger = log.FromContext(ctx).Scope(bctx.Config.Module) if _, ok := activeBuildCmd.Get(); ok { - c.result <- either.RightOf[BuildResult](fmt.Errorf("build already in progress")) + c.result <- result.Err[BuildResult](fmt.Errorf("build already in progress")) continue } configProto, err := protoFromModuleConfig(bctx.Config) if err != nil { - c.result <- either.RightOf[BuildResult](err) + c.result <- result.Err[BuildResult](err) continue } @@ -273,7 +271,7 @@ func (p *externalPlugin) run(ctx context.Context) { }, })) if err != nil { - c.result <- either.RightOf[BuildResult](fmt.Errorf("failed to send updated build context to plugin: %w", err)) + c.result <- result.Err[BuildResult](fmt.Errorf("failed to send updated build context to plugin: %w", err)) continue } activeBuildCmd = optional.Some[externalBuildCommand](c) @@ -291,7 +289,7 @@ func (p *externalPlugin) run(ctx context.Context) { }, })) if err != nil { - c.result <- either.RightOf[BuildResult](fmt.Errorf("failed to start build stream: %w", err)) + c.result <- result.Err[BuildResult](fmt.Errorf("failed to start build stream: %w", err)) continue } activeBuildCmd = optional.Some[externalBuildCommand](c) @@ -299,27 +297,23 @@ func (p *externalPlugin) run(ctx context.Context) { streamCancel = newCancelFunc // Receive messages from the current build stream - case eitherResult := <-streamChan: - if eitherResult == nil { - streamChan = nil - streamCancel = nil - continue - } - - var e *langpb.BuildEvent - switch r := eitherResult.(type) { - case either.Left[*langpb.BuildEvent, error]: - e = r.Get() - case either.Right[*langpb.BuildEvent, error]: + case r := <-streamChan: + e, err := r.Result() + if err != nil { // Stream failed if c, ok := activeBuildCmd.Get(); ok { - c.result <- either.RightOf[BuildResult](r.Get()) + c.result <- result.Err[BuildResult](err) activeBuildCmd = optional.None[externalBuildCommand]() } streamCancel = nil streamChan = nil + } + if e == nil { + streamChan = nil + streamCancel = nil continue } + switch event := e.Event.(type) { case *langpb.BuildEvent_LogMessage: logger.Logf(langpb.LogLevelFromProto(event.LogMessage.Level), "%s", event.LogMessage.Message) @@ -376,15 +370,12 @@ func getBuildSuccessOrFailure(e *langpb.BuildEvent) (result either.Either[*langp } // handleBuildResult processes the result of a build and publishes the appropriate events. -func (p *externalPlugin) handleBuildResult(module string, result either.Either[*langpb.BuildEvent_BuildSuccess, *langpb.BuildEvent_BuildFailure], activeBuildCmd optional.Option[externalBuildCommand]) (streamEnded, cmdEnded bool) { - buildResult, err := buildResultFromProto(result) +func (p *externalPlugin) handleBuildResult(module string, r either.Either[*langpb.BuildEvent_BuildSuccess, *langpb.BuildEvent_BuildFailure], activeBuildCmd optional.Option[externalBuildCommand]) (streamEnded, cmdEnded bool) { + buildResult, err := buildResultFromProto(r) if cmd, ok := activeBuildCmd.Get(); ok { // handle explicit build - if err != nil { - cmd.result <- either.RightOf[BuildResult](err) - } else { - cmd.result <- either.LeftOf[error](buildResult) - } + cmd.result <- result.From(buildResult, err) + cmdEnded = true if !cmd.rebuildAutomatically { streamEnded = true @@ -392,17 +383,10 @@ func (p *externalPlugin) handleBuildResult(module string, result either.Either[* return } // handle auto rebuild - if err != nil { - p.updates.Publish(AutoRebuildEndedEvent{ - Module: module, - Result: either.RightOf[BuildResult](err), - }) - } else { - p.updates.Publish(AutoRebuildEndedEvent{ - Module: module, - Result: either.LeftOf[error](buildResult), - }) - } + p.updates.Publish(AutoRebuildEndedEvent{ + Module: module, + Result: result.From(buildResult, err), + }) return } diff --git a/internal/buildengine/languageplugin/external_plugin_client.go b/internal/buildengine/languageplugin/external_plugin_client.go index f6113f8968..0b2cf41e6d 100644 --- a/internal/buildengine/languageplugin/external_plugin_client.go +++ b/internal/buildengine/languageplugin/external_plugin_client.go @@ -7,7 +7,7 @@ import ( "syscall" "connectrpc.com/connect" - "github.com/alecthomas/types/either" + "github.com/alecthomas/types/result" "github.com/jpillora/backoff" langpb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/language" @@ -25,7 +25,7 @@ type externalPluginClient interface { moduleConfigDefaults(ctx context.Context, req *connect.Request[langpb.ModuleConfigDefaultsRequest]) (*connect.Response[langpb.ModuleConfigDefaultsResponse], error) getDependencies(ctx context.Context, req *connect.Request[langpb.DependenciesRequest]) (*connect.Response[langpb.DependenciesResponse], error) - build(ctx context.Context, req *connect.Request[langpb.BuildRequest]) (chan either.Either[*langpb.BuildEvent, error], streamCancelFunc, error) + build(ctx context.Context, req *connect.Request[langpb.BuildRequest]) (chan result.Result[*langpb.BuildEvent], streamCancelFunc, error) buildContextUpdated(ctx context.Context, req *connect.Request[langpb.BuildContextUpdatedRequest]) (*connect.Response[langpb.BuildContextUpdatedResponse], error) kill() error @@ -144,13 +144,13 @@ func (p *externalPluginImpl) getDependencies(ctx context.Context, req *connect.R return resp, nil } -func (p *externalPluginImpl) build(ctx context.Context, req *connect.Request[langpb.BuildRequest]) (chan either.Either[*langpb.BuildEvent, error], streamCancelFunc, error) { +func (p *externalPluginImpl) build(ctx context.Context, req *connect.Request[langpb.BuildRequest]) (chan result.Result[*langpb.BuildEvent], streamCancelFunc, error) { stream, err := p.client.Build(ctx, req) if err != nil { return nil, nil, err //nolint:wrapcheck } - streamChan := make(chan either.Either[*langpb.BuildEvent, error], 64) + streamChan := make(chan result.Result[*langpb.BuildEvent], 64) go streamToChan(stream, streamChan) return streamChan, func() { @@ -159,12 +159,12 @@ func (p *externalPluginImpl) build(ctx context.Context, req *connect.Request[lan }, nil } -func streamToChan(stream *connect.ServerStreamForClient[langpb.BuildEvent], ch chan either.Either[*langpb.BuildEvent, error]) { +func streamToChan(stream *connect.ServerStreamForClient[langpb.BuildEvent], ch chan result.Result[*langpb.BuildEvent]) { for stream.Receive() { - ch <- either.LeftOf[error](stream.Msg()) + ch <- result.From(stream.Msg(), nil) } if err := stream.Err(); err != nil { - ch <- either.RightOf[*langpb.BuildEvent](err) + ch <- result.Err[*langpb.BuildEvent](err) } close(ch) } diff --git a/internal/buildengine/languageplugin/external_plugin_test.go b/internal/buildengine/languageplugin/external_plugin_test.go index 0381490047..df4e132931 100644 --- a/internal/buildengine/languageplugin/external_plugin_test.go +++ b/internal/buildengine/languageplugin/external_plugin_test.go @@ -10,8 +10,8 @@ import ( "github.com/alecthomas/assert/v2" "github.com/alecthomas/atomic" "github.com/alecthomas/kong" - "github.com/alecthomas/types/either" "github.com/alecthomas/types/optional" + "github.com/alecthomas/types/result" "connectrpc.com/connect" langpb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/language" @@ -32,7 +32,7 @@ type mockExternalPluginClient struct { // atomic.Value does not allow us to atomically publish, close and replace the chan buildEventsLock *sync.Mutex - buildEvents chan either.Either[*langpb.BuildEvent, error] + buildEvents chan result.Result[*langpb.BuildEvent] latestBuildContext atomic.Value[testBuildContext] } @@ -42,7 +42,7 @@ var _ externalPluginClient = &mockExternalPluginClient{} func newMockExternalPlugin() *mockExternalPluginClient { return &mockExternalPluginClient{ buildEventsLock: &sync.Mutex{}, - buildEvents: make(chan either.Either[*langpb.BuildEvent, error], 64), + buildEvents: make(chan result.Result[*langpb.BuildEvent], 64), } } @@ -89,7 +89,7 @@ func buildContextFromProto(proto *langpb.BuildContext) (BuildContext, error) { }, nil } -func (p *mockExternalPluginClient) build(ctx context.Context, req *connect.Request[langpb.BuildRequest]) (chan either.Either[*langpb.BuildEvent, error], streamCancelFunc, error) { +func (p *mockExternalPluginClient) build(ctx context.Context, req *connect.Request[langpb.BuildRequest]) (chan result.Result[*langpb.BuildEvent], streamCancelFunc, error) { p.buildEventsLock.Lock() defer p.buildEventsLock.Unlock() @@ -234,9 +234,8 @@ func TestSimultaneousBuild(t *testing.T) { ctx, plugin, _, bctx := setUp() _ = beginBuild(ctx, plugin, bctx, false) r := beginBuild(ctx, plugin, bctx, false) - result, ok := (<-r).(either.Right[BuildResult, error]) - assert.True(t, ok, "expected error, got %v", result) - assert.Contains(t, result.Get().Error(), "build already in progress") + _, err := (<-r).Result() + assert.EqualError(t, err, "build already in progress") } func TestMismatchedBuildContextID(t *testing.T) { @@ -410,48 +409,39 @@ func (p *mockExternalPluginClient) publishBuildEvent(event *langpb.BuildEvent) { p.buildEventsLock.Lock() defer p.buildEventsLock.Unlock() - p.buildEvents <- either.LeftOf[error](event) + p.buildEvents <- result.From(event, nil) } -func beginBuild(ctx context.Context, plugin *externalPlugin, bctx BuildContext, autoRebuild bool) chan either.Either[BuildResult, error] { - result := make(chan either.Either[BuildResult, error]) +func beginBuild(ctx context.Context, plugin *externalPlugin, bctx BuildContext, autoRebuild bool) chan result.Result[BuildResult] { + resultChan := make(chan result.Result[BuildResult]) go func() { - r, err := plugin.Build(ctx, "", bctx, []string{}, autoRebuild) - if err != nil { - result <- either.RightOf[BuildResult](err) - } else { - result <- either.LeftOf[error](r) - } + resultChan <- result.From(plugin.Build(ctx, "", bctx, []string{}, autoRebuild)) }() // sleep to make sure impl has received the build context time.Sleep(300 * time.Millisecond) - return result + return resultChan } func (p *mockExternalPluginClient) breakStream() { p.buildEventsLock.Lock() defer p.buildEventsLock.Unlock() - p.buildEvents <- either.RightOf[*langpb.BuildEvent](fmt.Errorf("fake a broken stream")) + p.buildEvents <- result.Err[*langpb.BuildEvent](fmt.Errorf("fake a broken stream")) close(p.buildEvents) - p.buildEvents = make(chan either.Either[*langpb.BuildEvent, error], 64) + p.buildEvents = make(chan result.Result[*langpb.BuildEvent], 64) } -func checkResult(t *testing.T, r either.Either[BuildResult, error], expectedMsg string) { +func checkResult(t *testing.T, r result.Result[BuildResult], expectedMsg string) { t.Helper() - left, ok := r.(either.Left[BuildResult, error]) + buildResult, ok := r.Get() assert.True(t, ok, "expected build result, got %v", r) - - buildResult := left.Get() assert.Equal(t, len(buildResult.Errors), 1) assert.Equal(t, buildResult.Errors[0].Msg, expectedMsg) } -func checkStreamError(t *testing.T, r either.Either[BuildResult, error]) { +func checkStreamError(t *testing.T, r result.Result[BuildResult]) { t.Helper() - right, ok := r.(either.Right[BuildResult, error]) - assert.True(t, ok, "expected error result, got %v", r) - - assert.Equal(t, right.Get(), fmt.Errorf("fake a broken stream")) + _, err := r.Result() + assert.EqualError(t, err, "fake a broken stream") } func checkAutoRebuildResult(t *testing.T, e PluginEvent, expectedMsg string) { diff --git a/internal/buildengine/languageplugin/plugin.go b/internal/buildengine/languageplugin/plugin.go index 83188c3e42..cfb6944aa1 100644 --- a/internal/buildengine/languageplugin/plugin.go +++ b/internal/buildengine/languageplugin/plugin.go @@ -10,6 +10,7 @@ import ( "github.com/alecthomas/kong" "github.com/alecthomas/types/either" "github.com/alecthomas/types/pubsub" + "github.com/alecthomas/types/result" "github.com/TBD54566975/ftl/internal/bind" "github.com/TBD54566975/ftl/internal/builderrors" @@ -54,7 +55,7 @@ func (e AutoRebuildStartedEvent) ModuleName() string { return e.Module } // AutoRebuildEndedEvent is sent when the plugin ends an automatic rebuild. type AutoRebuildEndedEvent struct { Module string - Result either.Either[BuildResult, error] + Result result.Result[BuildResult] } func (AutoRebuildEndedEvent) pluginEvent() {} @@ -294,17 +295,9 @@ func (p *internalPlugin) run(ctx context.Context) { // automatic rebuild p.updates.Publish(AutoRebuildStartedEvent{Module: bctx.Config.Module}) - result, err := buildAndLoadResult(ctx, projectRoot, bctx, buildEnv, true, watcher, p.buildFunc) - if err != nil { - p.updates.Publish(AutoRebuildEndedEvent{ - Module: bctx.Config.Module, - Result: either.RightOf[BuildResult](err), - }) - continue - } p.updates.Publish(AutoRebuildEndedEvent{ Module: bctx.Config.Module, - Result: either.LeftOf[error](result), + Result: result.From(buildAndLoadResult(ctx, projectRoot, bctx, buildEnv, true, watcher, p.buildFunc)), }) case watch.WatchEventModuleAdded: // ignore From 5117102cee1239f3e5c4b6cf202f612e30900ed9 Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Fri, 11 Oct 2024 13:07:50 +1100 Subject: [PATCH 11/13] remove handling of invalidated dependencies --- internal/buildengine/engine.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/internal/buildengine/engine.go b/internal/buildengine/engine.go index 43afc67f42..a3c654510b 100644 --- a/internal/buildengine/engine.go +++ b/internal/buildengine/engine.go @@ -820,13 +820,7 @@ func (e *Engine) build(ctx context.Context, moduleName string, builtModules map[ if err != nil { terminal.UpdateModuleState(ctx, moduleName, terminal.BuildStateFailed) if errors.Is(err, errInvalidateDependencies) { - go func() { - logger := log.FromContext(ctx) - err := e.BuildAndDeploy(ctx, 1, true, moduleName) - e.reportBuildFailed(err) - terminal.UpdateModuleState(ctx, moduleName, terminal.BuildStateFailed) - logger.Errorf(err, "Build and deploy failed for module %q", moduleName) - }() + // TODO: handle this } return err } From 352df08e2e46b0c1c2009adef4e2aebc488c87c3 Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Fri, 11 Oct 2024 13:17:27 +1100 Subject: [PATCH 12/13] handle error for plugin that exits before ping established --- internal/buildengine/languageplugin/external_plugin_client.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/buildengine/languageplugin/external_plugin_client.go b/internal/buildengine/languageplugin/external_plugin_client.go index 0b2cf41e6d..7611901c74 100644 --- a/internal/buildengine/languageplugin/external_plugin_client.go +++ b/internal/buildengine/languageplugin/external_plugin_client.go @@ -81,8 +81,12 @@ func (p *externalPluginImpl) start(ctx context.Context, bind *url.URL, language close(pingErr) }() + // Wait for ping result, or for the plugin to exit. Which ever happens first. select { case err := <-cmdErr: + if err == nil { + return fmt.Errorf("plugin exited with status 0 before ping was registered") + } return err case err := <-pingErr: if err != nil { From aab54de449c10ebd249586c904065d66bbf8537a Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Fri, 11 Oct 2024 13:37:14 +1100 Subject: [PATCH 13/13] lint --- internal/buildengine/engine.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/internal/buildengine/engine.go b/internal/buildengine/engine.go index a3c654510b..5a69481d76 100644 --- a/internal/buildengine/engine.go +++ b/internal/buildengine/engine.go @@ -819,9 +819,7 @@ func (e *Engine) build(ctx context.Context, moduleName string, builtModules map[ }, e.buildEnv, e.devMode) if err != nil { terminal.UpdateModuleState(ctx, moduleName, terminal.BuildStateFailed) - if errors.Is(err, errInvalidateDependencies) { - // TODO: handle this - } + // TODO: handle errInvalidateDependencies return err } // update files to deploy