From 3ee5b1089ffb9c7060ac5694ad38704006c0ef44 Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Fri, 11 Oct 2024 13:47:04 +1100 Subject: [PATCH] feat: add external plugin to build engine (#2994) Adds `languageplugin.externalPlugin` for working with plugins over gRPC. This PR does not include any non-test implementations. Part of #2452 --- backend/controller/admin/local_client.go | 2 +- .../xyz/block/ftl/v1/language/mixins.go | 14 + frontend/cli/cmd_new.go | 2 + frontend/cli/cmd_schema_diff.go | 2 +- frontend/cli/main.go | 6 +- internal/buildengine/build.go | 32 +- internal/buildengine/engine.go | 10 +- .../languageplugin/external_plugin.go | 428 +++++++++++++++++ .../languageplugin/external_plugin_client.go | 182 +++++++ .../languageplugin/external_plugin_test.go | 452 ++++++++++++++++++ .../buildengine/languageplugin/go_plugin.go | 6 +- .../buildengine/languageplugin/java_plugin.go | 12 +- .../languageplugin/java_plugin_test.go | 9 +- internal/buildengine/languageplugin/plugin.go | 91 ++-- .../buildengine/languageplugin/rust_plugin.go | 6 +- internal/moduleconfig/moduleconfig.go | 13 +- internal/moduleconfig/moduleconfig_test.go | 9 +- 17 files changed, 1183 insertions(+), 93 deletions(-) create mode 100644 internal/buildengine/languageplugin/external_plugin.go create mode 100644 internal/buildengine/languageplugin/external_plugin_client.go create mode 100644 internal/buildengine/languageplugin/external_plugin_test.go diff --git a/backend/controller/admin/local_client.go b/backend/controller/admin/local_client.go index 5334087aae..c89f3e214d 100644 --- a/backend/controller/admin/local_client.go +++ b/backend/controller/admin/local_client.go @@ -62,7 +62,7 @@ func (s *diskSchemaRetriever) GetActiveSchema(ctx context.Context, bAllocator op if err != nil { moduleSchemas <- either.RightOf[*schema.Module](fmt.Errorf("could not load plugin for %s: %w", m.Module, err)) } - defer plugin.Kill(ctx) // nolint:errcheck + defer plugin.Kill() // nolint:errcheck customDefaults, err := plugin.ModuleConfigDefaults(ctx, m.Dir) if err != nil { diff --git a/backend/protos/xyz/block/ftl/v1/language/mixins.go b/backend/protos/xyz/block/ftl/v1/language/mixins.go index bf76ad09f8..faddd24261 100644 --- a/backend/protos/xyz/block/ftl/v1/language/mixins.go +++ b/backend/protos/xyz/block/ftl/v1/language/mixins.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/TBD54566975/ftl/internal/builderrors" + "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/slices" ) @@ -72,3 +73,16 @@ func PosFromProto(pos *Position) builderrors.Position { Filename: pos.Filename, } } + +func LogLevelFromProto(level LogMessage_LogLevel) log.Level { + switch level { + case LogMessage_INFO: + return log.Info + case LogMessage_WARN: + return log.Warn + case LogMessage_ERROR: + return log.Error + default: + panic(fmt.Sprintf("unhandled Log_Level %v", level)) + } +} diff --git a/frontend/cli/cmd_new.go b/frontend/cli/cmd_new.go index 7ac6dfd646..18204126b5 100644 --- a/frontend/cli/cmd_new.go +++ b/frontend/cli/cmd_new.go @@ -35,6 +35,8 @@ type newCmd struct { // - help text (ftl new go --help) // - default values // - environment variable overrides +// +// Language plugins take time to launch, so we return the one we created so it can be reused in Run(). func prepareNewCmd(ctx context.Context, k *kong.Kong, args []string) (optionalPlugin optional.Option[languageplugin.LanguagePlugin], err error) { if len(args) < 2 { return optionalPlugin, nil diff --git a/frontend/cli/cmd_schema_diff.go b/frontend/cli/cmd_schema_diff.go index 661727247f..60ea4a74c3 100644 --- a/frontend/cli/cmd_schema_diff.go +++ b/frontend/cli/cmd_schema_diff.go @@ -116,7 +116,7 @@ func localSchema(ctx context.Context, projectConfig projectconfig.Config, bindAl if err != nil { moduleSchemas <- either.RightOf[*schema.Module](err) } - defer plugin.Kill(ctx) // nolint:errcheck + defer plugin.Kill() // nolint:errcheck customDefaults, err := plugin.ModuleConfigDefaults(ctx, m.Dir) if err != nil { diff --git a/frontend/cli/main.go b/frontend/cli/main.go index f042bf5e01..70328f4a38 100644 --- a/frontend/cli/main.go +++ b/frontend/cli/main.go @@ -85,14 +85,16 @@ func main() { csm := ¤tStatusManager{} app := createKongApplication(&cli, csm) - languagePlugin, err := prepareNewCmd(ctx, app, os.Args[1:]) + + // Dynamically update the kong app with language specific flags for the "ftl new" command. + languagePlugin, err := prepareNewCmd(log.ContextWithNewDefaultLogger(ctx), app, os.Args[1:]) app.FatalIfErrorf(err) kctx, err := app.Parse(os.Args[1:]) app.FatalIfErrorf(err) if plugin, ok := languagePlugin.Get(); ok { - // for "ftl new" command, we only need to create the language plugin once + // Plugins take time to launch, so we bind the "ftl new" plugin to the kong context. kctx.BindTo(plugin, (*languageplugin.LanguagePlugin)(nil)) } diff --git a/internal/buildengine/build.go b/internal/buildengine/build.go index b87134b2e9..782563ee72 100644 --- a/internal/buildengine/build.go +++ b/internal/buildengine/build.go @@ -6,7 +6,7 @@ import ( "os" "time" - "github.com/alecthomas/types/either" + "github.com/alecthomas/types/result" "google.golang.org/protobuf/proto" "github.com/TBD54566975/ftl/internal/buildengine/languageplugin" @@ -17,33 +17,33 @@ import ( "github.com/TBD54566975/ftl/internal/schema" ) +var errInvalidateDependencies = errors.New("dependencies need to be updated") + // Build a module in the given directory given the schema and module config. // // A lock file is used to ensure that only one build is running at a time. -func build(ctx context.Context, plugin languageplugin.LanguagePlugin, projectRootDir string, sch *schema.Schema, config moduleconfig.ModuleConfig, buildEnv []string, devMode bool) (moduleSchema *schema.Module, deploy []string, err error) { - logger := log.FromContext(ctx).Module(config.Module).Scope("build") +// +// Returns invalidateDependenciesError if the build failed due to a change in dependencies. +func build(ctx context.Context, plugin languageplugin.LanguagePlugin, projectRootDir string, bctx languageplugin.BuildContext, buildEnv []string, devMode bool) (moduleSchema *schema.Module, deploy []string, err error) { + logger := log.FromContext(ctx).Module(bctx.Config.Module).Scope("build") ctx = log.ContextWithLogger(ctx, logger) logger.Infof("Building module") - - result, err := plugin.Build(ctx, projectRootDir, config, sch, buildEnv, devMode) - if err != nil { - return handleBuildResult(ctx, config, either.RightOf[languageplugin.BuildResult](err)) - } - return handleBuildResult(ctx, config, either.LeftOf[error](result)) + return handleBuildResult(ctx, bctx.Config, result.From(plugin.Build(ctx, projectRootDir, bctx, buildEnv, devMode))) } // handleBuildResult processes the result of a build -func handleBuildResult(ctx context.Context, c moduleconfig.ModuleConfig, eitherResult either.Either[languageplugin.BuildResult, error]) (moduleSchema *schema.Module, deploy []string, err error) { +func handleBuildResult(ctx context.Context, c moduleconfig.ModuleConfig, eitherResult result.Result[languageplugin.BuildResult]) (moduleSchema *schema.Module, deploy []string, err error) { logger := log.FromContext(ctx) config := c.Abs() - var result languageplugin.BuildResult - switch eitherResult := eitherResult.(type) { - case either.Right[languageplugin.BuildResult, error]: - return nil, nil, fmt.Errorf("failed to build module: %w", eitherResult.Get()) - case either.Left[languageplugin.BuildResult, error]: - result = eitherResult.Get() + result, err := eitherResult.Result() + if err != nil { + return nil, nil, fmt.Errorf("failed to build module: %w", err) + } + + if result.InvalidateDependencies { + return nil, nil, errInvalidateDependencies } var errs []error diff --git a/internal/buildengine/engine.go b/internal/buildengine/engine.go index d196f456aa..5a69481d76 100644 --- a/internal/buildengine/engine.go +++ b/internal/buildengine/engine.go @@ -487,7 +487,7 @@ func (e *Engine) watchForModuleChanges(ctx context.Context, period time.Duration } if meta, ok := e.moduleMetas.Load(event.Config.Module); ok { meta.plugin.Updates().Unsubscribe(meta.events) - err := meta.plugin.Kill(ctx) + err := meta.plugin.Kill() if err != nil { didError = true e.reportBuildFailed(err) @@ -633,7 +633,6 @@ func (e *Engine) BuildAndDeploy(ctx context.Context, replicas int32, waitForDepl type buildCallback func(ctx context.Context, module Module) error func (e *Engine) buildWithCallback(ctx context.Context, callback buildCallback, moduleNames ...string) error { - if len(moduleNames) == 0 { e.moduleMetas.Range(func(name string, meta moduleMeta) bool { moduleNames = append(moduleNames, name) @@ -813,9 +812,14 @@ func (e *Engine) build(ctx context.Context, moduleName string, builtModules map[ e.listener.OnBuildStarted(meta.module) } - moduleSchema, deploy, err := build(ctx, meta.plugin, e.projectRoot, sch, meta.module.Config, e.buildEnv, e.devMode) + moduleSchema, deploy, err := build(ctx, meta.plugin, e.projectRoot, languageplugin.BuildContext{ + Config: meta.module.Config, + Schema: sch, + Dependencies: meta.module.Dependencies, + }, e.buildEnv, e.devMode) if err != nil { terminal.UpdateModuleState(ctx, moduleName, terminal.BuildStateFailed) + // TODO: handle errInvalidateDependencies return err } // update files to deploy diff --git a/internal/buildengine/languageplugin/external_plugin.go b/internal/buildengine/languageplugin/external_plugin.go new file mode 100644 index 0000000000..adaa3edfae --- /dev/null +++ b/internal/buildengine/languageplugin/external_plugin.go @@ -0,0 +1,428 @@ +package languageplugin + +import ( + "context" + "fmt" + "net/url" + "time" + + "connectrpc.com/connect" + "github.com/alecthomas/kong" + "github.com/alecthomas/types/either" + "github.com/alecthomas/types/optional" + "github.com/alecthomas/types/pubsub" + "github.com/alecthomas/types/result" + "google.golang.org/protobuf/types/known/structpb" + + langpb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/language" + schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema" + "github.com/TBD54566975/ftl/internal/builderrors" + "github.com/TBD54566975/ftl/internal/log" + "github.com/TBD54566975/ftl/internal/moduleconfig" + "github.com/TBD54566975/ftl/internal/projectconfig" + "github.com/TBD54566975/ftl/internal/schema" +) + +const launchTimeout = 10 * time.Second + +type externalBuildCommand struct { + BuildContext + projectRoot string + rebuildAutomatically bool + + result chan result.Result[BuildResult] +} + +type externalPlugin struct { + client externalPluginClient + + // cancels the run() context + cancel context.CancelFunc + + // commands to execute + commands chan externalBuildCommand + + updates *pubsub.Topic[PluginEvent] +} + +var _ LanguagePlugin = &externalPlugin{} + +func newExternalPlugin(ctx context.Context, bind *url.URL, language string) (*externalPlugin, error) { + impl, err := newExternalPluginImpl(ctx, bind, language) + if err != nil { + return nil, err + } + return newExternalPluginForTesting(ctx, impl), nil +} + +func newExternalPluginForTesting(ctx context.Context, client externalPluginClient) *externalPlugin { + plugin := &externalPlugin{ + client: client, + commands: make(chan externalBuildCommand, 64), + updates: pubsub.New[PluginEvent](), + } + + var runCtx context.Context + runCtx, plugin.cancel = context.WithCancel(ctx) + go plugin.run(runCtx) + + return plugin +} + +func (p *externalPlugin) Kill() error { + p.cancel() + if err := p.client.kill(); err != nil { + return fmt.Errorf("failed to kill language plugin: %w", err) + } + return nil +} + +func (p *externalPlugin) Updates() *pubsub.Topic[PluginEvent] { + return p.updates +} + +func (p *externalPlugin) GetCreateModuleFlags(ctx context.Context) ([]*kong.Flag, error) { + res, err := p.client.getCreateModuleFlags(ctx, connect.NewRequest(&langpb.GetCreateModuleFlagsRequest{})) + if err != nil { + return nil, fmt.Errorf("failed to get create module flags from plugin: %w", err) + } + flags := []*kong.Flag{} + shorts := map[rune]string{} + for _, f := range res.Msg.Flags { + flag := &kong.Flag{ + Value: &kong.Value{ + Name: f.Name, + Help: f.Help, + Tag: &kong.Tag{}, + }, + } + if f.Envar != nil && *f.Envar != "" { + flag.Value.Tag.Envs = []string{*f.Envar} + } + if f.Default != nil && *f.Default != "" { + flag.Value.HasDefault = true + flag.Value.Default = *f.Default + } + if f.Short != nil && *f.Short != "" { + if len(*f.Short) > 1 { + return nil, fmt.Errorf("invalid flag declared: short flag %q for %v must be a single character", *f.Short, f.Name) + } + short := rune((*f.Short)[0]) + if existingFullName, ok := shorts[short]; ok { + return nil, fmt.Errorf("multiple flags declared with the same short name: %v and %v", existingFullName, f.Name) + } + flag.Short = short + shorts[short] = f.Name + + } + if f.Placeholder != nil && *f.Placeholder != "" { + flag.PlaceHolder = *f.Placeholder + } + flags = append(flags, flag) + } + return flags, nil +} + +// CreateModule creates a new module in the given directory with the given name and language. +func (p *externalPlugin) CreateModule(ctx context.Context, projConfig projectconfig.Config, moduleConfig moduleconfig.ModuleConfig, flags map[string]string) error { + _, err := p.client.createModule(ctx, connect.NewRequest(&langpb.CreateModuleRequest{ + Name: moduleConfig.Module, + Path: moduleConfig.Dir, + ProjectConfig: &langpb.ProjectConfig{ + Path: projConfig.Path, + Name: projConfig.Name, + NoGit: projConfig.NoGit, + Hermit: projConfig.Hermit, + }, + })) + if err != nil { + return fmt.Errorf("failed to create module: %w", err) + } + return nil +} + +func (p *externalPlugin) ModuleConfigDefaults(ctx context.Context, dir string) (moduleconfig.CustomDefaults, error) { + resp, err := p.client.moduleConfigDefaults(ctx, connect.NewRequest(&langpb.ModuleConfigDefaultsRequest{ + Path: dir, + })) + if err != nil { + return moduleconfig.CustomDefaults{}, fmt.Errorf("failed to get module config defaults from plugin: %w", err) + } + return moduleconfig.CustomDefaults{ + DeployDir: resp.Msg.DeployDir, + Watch: resp.Msg.Watch, + Build: optional.Ptr(resp.Msg.Build), + GeneratedSchemaDir: optional.Ptr(resp.Msg.GeneratedSchemaDir), + LanguageConfig: resp.Msg.LanguageConfig.AsMap(), + }, nil +} + +func (p *externalPlugin) GetDependencies(ctx context.Context, config moduleconfig.ModuleConfig) ([]string, error) { + configProto, err := protoFromModuleConfig(config) + if err != nil { + return nil, err + } + resp, err := p.client.getDependencies(ctx, connect.NewRequest(&langpb.DependenciesRequest{ + ModuleConfig: configProto, + })) + if err != nil { + return nil, fmt.Errorf("failed to get dependencies from plugin: %w", err) + } + return resp.Msg.Modules, nil +} + +// Build may result in a Build or BuildContextUpdated grpc call with the plugin, depending if a build stream is already set up +func (p *externalPlugin) Build(ctx context.Context, projectRoot string, bctx BuildContext, buildEnv []string, rebuildAutomatically bool) (BuildResult, error) { + cmd := externalBuildCommand{ + BuildContext: bctx, + projectRoot: projectRoot, + rebuildAutomatically: rebuildAutomatically, + result: make(chan result.Result[BuildResult]), + } + p.commands <- cmd + select { + case r := <-cmd.result: + result, err := r.Result() + if err != nil { + return BuildResult{}, err //nolint:wrapcheck + } + return result, nil + case <-ctx.Done(): + return BuildResult{}, fmt.Errorf("error waiting for build to complete: %w", ctx.Err()) + } +} + +func protoFromModuleConfig(c moduleconfig.ModuleConfig) (*langpb.ModuleConfig, error) { + config := c.Abs() + proto := &langpb.ModuleConfig{ + Name: config.Module, + Path: config.Dir, + DeployDir: config.DeployDir, + Watch: config.Watch, + } + if config.Build != "" { + proto.Build = &config.Build + } + if config.GeneratedSchemaDir != "" { + proto.GeneratedSchemaDir = &config.GeneratedSchemaDir + } + + langConfigProto, err := structpb.NewStruct(config.LanguageConfig) + if err != nil { + return nil, fmt.Errorf("failed to marshal language config: %w", err) + } + proto.LanguageConfig = langConfigProto + + return proto, nil +} + +func (p *externalPlugin) run(ctx context.Context) { + // State + var bctx BuildContext + var projectRoot string + + // if a current build stream is active, this is non-nil + // this does not indicate if the stream is listening to automatic rebuilds + var streamChan chan result.Result[*langpb.BuildEvent] + var streamCancel streamCancelFunc + + // if an explicit build command is active, this is non-nil + // if this is nil, streamChan may still be open for automatic rebuilds + var activeBuildCmd optional.Option[externalBuildCommand] + + // build counter is used to generate build request ids + var contextCounter = 0 + + // can not scope logger initially without knowing module name + logger := log.FromContext(ctx) + + for { + select { + // Process incoming commands + case c := <-p.commands: + // update state + contextCounter++ + bctx = c.BuildContext + projectRoot = c.projectRoot + + // module name may have changed, update logger scope + logger = log.FromContext(ctx).Scope(bctx.Config.Module) + + if _, ok := activeBuildCmd.Get(); ok { + c.result <- result.Err[BuildResult](fmt.Errorf("build already in progress")) + continue + } + configProto, err := protoFromModuleConfig(bctx.Config) + if err != nil { + c.result <- result.Err[BuildResult](err) + continue + } + + schemaProto := bctx.Schema.ToProto().(*schemapb.Schema) //nolint:forcetypeassert + + if streamChan != nil { + // tell plugin about new build context so that it rebuilds in existing build stream + _, err = p.client.buildContextUpdated(ctx, connect.NewRequest(&langpb.BuildContextUpdatedRequest{ + BuildContext: &langpb.BuildContext{ + Id: contextID(bctx.Config, contextCounter), + ModuleConfig: configProto, + Schema: schemaProto, + Dependencies: bctx.Dependencies, + }, + })) + if err != nil { + c.result <- result.Err[BuildResult](fmt.Errorf("failed to send updated build context to plugin: %w", err)) + continue + } + activeBuildCmd = optional.Some[externalBuildCommand](c) + continue + } + + newStreamChan, newCancelFunc, err := p.client.build(ctx, connect.NewRequest(&langpb.BuildRequest{ + ProjectPath: projectRoot, + RebuildAutomatically: c.rebuildAutomatically, + BuildContext: &langpb.BuildContext{ + Id: contextID(bctx.Config, contextCounter), + ModuleConfig: configProto, + Schema: schemaProto, + Dependencies: bctx.Dependencies, + }, + })) + if err != nil { + c.result <- result.Err[BuildResult](fmt.Errorf("failed to start build stream: %w", err)) + continue + } + activeBuildCmd = optional.Some[externalBuildCommand](c) + streamChan = newStreamChan + streamCancel = newCancelFunc + + // Receive messages from the current build stream + case r := <-streamChan: + e, err := r.Result() + if err != nil { + // Stream failed + if c, ok := activeBuildCmd.Get(); ok { + c.result <- result.Err[BuildResult](err) + activeBuildCmd = optional.None[externalBuildCommand]() + } + streamCancel = nil + streamChan = nil + } + if e == nil { + streamChan = nil + streamCancel = nil + continue + } + + switch event := e.Event.(type) { + case *langpb.BuildEvent_LogMessage: + logger.Logf(langpb.LogLevelFromProto(event.LogMessage.Level), "%s", event.LogMessage.Message) + case *langpb.BuildEvent_AutoRebuildStarted: + if _, ok := activeBuildCmd.Get(); ok { + logger.Debugf("ignoring automatic rebuild started during explicit build") + continue + } + p.updates.Publish(AutoRebuildStartedEvent{ + Module: bctx.Config.Module, + }) + case *langpb.BuildEvent_BuildSuccess, *langpb.BuildEvent_BuildFailure: + streamEnded := false + cmdEnded := false + result, eventContextID, isAutomaticRebuild := getBuildSuccessOrFailure(e) + if activeBuildCmd.Ok() == isAutomaticRebuild { + logger.Debugf("ignoring automatic rebuild while expecting explicit build") + continue + } else if eventContextID != contextID(bctx.Config, contextCounter) { + logger.Debugf("received build for outdated context %q; expected %q", eventContextID, contextID(bctx.Config, contextCounter)) + continue + } + streamEnded, cmdEnded = p.handleBuildResult(bctx.Config.Module, result, activeBuildCmd) + if streamEnded { + streamCancel() + streamCancel = nil + streamChan = nil + } + if cmdEnded { + activeBuildCmd = optional.None[externalBuildCommand]() + } + } + + case <-ctx.Done(): + if streamCancel != nil { + streamCancel() + } + return + } + } +} + +// getBuildSuccessOrFailure takes a BuildFailure or BuildSuccess event and returns the shared fields and an either wrapped result. +// This makes it easier to have some shared logic for both event types. +func getBuildSuccessOrFailure(e *langpb.BuildEvent) (result either.Either[*langpb.BuildEvent_BuildSuccess, *langpb.BuildEvent_BuildFailure], contextID string, isAutomaticRebuild bool) { + switch e := e.Event.(type) { + case *langpb.BuildEvent_BuildSuccess: + return either.LeftOf[*langpb.BuildEvent_BuildFailure](e), e.BuildSuccess.ContextId, e.BuildSuccess.IsAutomaticRebuild + case *langpb.BuildEvent_BuildFailure: + return either.RightOf[*langpb.BuildEvent_BuildSuccess](e), e.BuildFailure.ContextId, e.BuildFailure.IsAutomaticRebuild + default: + panic(fmt.Sprintf("unexpected event type %T", e)) + } +} + +// handleBuildResult processes the result of a build and publishes the appropriate events. +func (p *externalPlugin) handleBuildResult(module string, r either.Either[*langpb.BuildEvent_BuildSuccess, *langpb.BuildEvent_BuildFailure], activeBuildCmd optional.Option[externalBuildCommand]) (streamEnded, cmdEnded bool) { + buildResult, err := buildResultFromProto(r) + if cmd, ok := activeBuildCmd.Get(); ok { + // handle explicit build + cmd.result <- result.From(buildResult, err) + + cmdEnded = true + if !cmd.rebuildAutomatically { + streamEnded = true + } + return + } + // handle auto rebuild + p.updates.Publish(AutoRebuildEndedEvent{ + Module: module, + Result: result.From(buildResult, err), + }) + return +} + +func buildResultFromProto(result either.Either[*langpb.BuildEvent_BuildSuccess, *langpb.BuildEvent_BuildFailure]) (buildResult BuildResult, err error) { + switch result := result.(type) { + case either.Left[*langpb.BuildEvent_BuildSuccess, *langpb.BuildEvent_BuildFailure]: + buildSuccess := result.Get().BuildSuccess + var moduleSch *schema.Module + if buildSuccess.Module != nil { + sch, err := schema.ModuleFromProto(buildSuccess.Module) + if err != nil { + return BuildResult{}, fmt.Errorf("failed to parse schema: %w", err) + } + moduleSch = sch + } + + errs := langpb.ErrorsFromProto(buildSuccess.Errors) + builderrors.SortErrorsByPosition(errs) + return BuildResult{ + Errors: errs, + Schema: moduleSch, + }, nil + case either.Right[*langpb.BuildEvent_BuildSuccess, *langpb.BuildEvent_BuildFailure]: + buildFailure := result.Get().BuildFailure + + errs := langpb.ErrorsFromProto(buildFailure.Errors) + builderrors.SortErrorsByPosition(errs) + return BuildResult{ + Errors: errs, + InvalidateDependencies: buildFailure.InvalidateDependencies, + }, nil + default: + panic(fmt.Sprintf("unexpected result type %T", result)) + } +} + +func contextID(config moduleconfig.ModuleConfig, counter int) string { + return fmt.Sprintf("%v-%v", config.Module, counter) +} diff --git a/internal/buildengine/languageplugin/external_plugin_client.go b/internal/buildengine/languageplugin/external_plugin_client.go new file mode 100644 index 0000000000..7611901c74 --- /dev/null +++ b/internal/buildengine/languageplugin/external_plugin_client.go @@ -0,0 +1,182 @@ +package languageplugin + +import ( + "context" + "fmt" + "net/url" + "syscall" + + "connectrpc.com/connect" + "github.com/alecthomas/types/result" + "github.com/jpillora/backoff" + + langpb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/language" + langconnect "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/language/languagepbconnect" + "github.com/TBD54566975/ftl/internal/exec" + "github.com/TBD54566975/ftl/internal/log" + "github.com/TBD54566975/ftl/internal/rpc" +) + +type streamCancelFunc func() + +type externalPluginClient interface { + getCreateModuleFlags(ctx context.Context, req *connect.Request[langpb.GetCreateModuleFlagsRequest]) (*connect.Response[langpb.GetCreateModuleFlagsResponse], error) + createModule(ctx context.Context, req *connect.Request[langpb.CreateModuleRequest]) (*connect.Response[langpb.CreateModuleResponse], error) + moduleConfigDefaults(ctx context.Context, req *connect.Request[langpb.ModuleConfigDefaultsRequest]) (*connect.Response[langpb.ModuleConfigDefaultsResponse], error) + getDependencies(ctx context.Context, req *connect.Request[langpb.DependenciesRequest]) (*connect.Response[langpb.DependenciesResponse], error) + + build(ctx context.Context, req *connect.Request[langpb.BuildRequest]) (chan result.Result[*langpb.BuildEvent], streamCancelFunc, error) + buildContextUpdated(ctx context.Context, req *connect.Request[langpb.BuildContextUpdatedRequest]) (*connect.Response[langpb.BuildContextUpdatedResponse], error) + + kill() error +} + +var _ externalPluginClient = &externalPluginImpl{} + +type externalPluginImpl struct { + cmd *exec.Cmd + client langconnect.LanguageServiceClient +} + +func newExternalPluginImpl(ctx context.Context, bind *url.URL, language string) (*externalPluginImpl, error) { + impl := &externalPluginImpl{ + client: rpc.Dial(langconnect.NewLanguageServiceClient, bind.String(), log.Error), + } + err := impl.start(ctx, bind, language) + if err != nil { + return nil, fmt.Errorf("failed to start plugin: %w", err) + } + return impl, nil +} + +// Start launches the plugin and blocks until the plugin is ready. +func (p *externalPluginImpl) start(ctx context.Context, bind *url.URL, language string) error { + cmdName := "ftl-language-" + language + p.cmd = exec.Command(ctx, log.Debug, ".", cmdName, "--bind", bind.String()) + _, err := exec.LookPath(cmdName) + if err != nil { + return fmt.Errorf("failed to find plugin for %s: %w", language, err) + } + + runCtx, cancel := context.WithCancel(ctx) + + cmdErr := make(chan error) + pingErr := make(chan error) + + // run the plugin and wait for it to finish executing + go func() { + err := p.cmd.RunBuffered(runCtx) + if err != nil { + cmdErr <- fmt.Errorf("language plugin failed: %w", err) + cancel() + } + close(cmdErr) + }() + go func() { + // wait for the plugin to be ready + if err := p.ping(runCtx); err != nil { + cancel() + pingErr <- fmt.Errorf("failed to ping plugin") + } + close(pingErr) + }() + + // Wait for ping result, or for the plugin to exit. Which ever happens first. + select { + case err := <-cmdErr: + if err == nil { + return fmt.Errorf("plugin exited with status 0 before ping was registered") + } + return err + case err := <-pingErr: + if err != nil { + return nil + } + return fmt.Errorf("failed to start plugin: %w", err) + case <-ctx.Done(): + return fmt.Errorf("failed to start plugin: %w", ctx.Err()) + } +} + +func (p *externalPluginImpl) ping(ctx context.Context) error { + retry := backoff.Backoff{} + heartbeatCtx, cancel := context.WithTimeout(ctx, launchTimeout) + defer cancel() + err := rpc.Wait(heartbeatCtx, retry, p.client) + if err != nil { + return connect.NewError(connect.CodeUnavailable, fmt.Errorf("failed to connect to runner: %w", err)) + } + return nil +} + +func (p *externalPluginImpl) kill() error { + if err := p.cmd.Kill(syscall.SIGINT); err != nil { + return err //nolint:wrapcheck + } + return nil +} + +func (p *externalPluginImpl) getCreateModuleFlags(ctx context.Context, req *connect.Request[langpb.GetCreateModuleFlagsRequest]) (*connect.Response[langpb.GetCreateModuleFlagsResponse], error) { + resp, err := p.client.GetCreateModuleFlags(ctx, req) + if err != nil { + return nil, err //nolint:wrapcheck + } + return resp, nil +} + +func (p *externalPluginImpl) moduleConfigDefaults(ctx context.Context, req *connect.Request[langpb.ModuleConfigDefaultsRequest]) (*connect.Response[langpb.ModuleConfigDefaultsResponse], error) { + resp, err := p.client.ModuleConfigDefaults(ctx, req) + if err != nil { + return nil, err //nolint:wrapcheck + } + return resp, nil +} + +func (p *externalPluginImpl) createModule(ctx context.Context, req *connect.Request[langpb.CreateModuleRequest]) (*connect.Response[langpb.CreateModuleResponse], error) { + resp, err := p.client.CreateModule(ctx, req) + if err != nil { + return nil, err //nolint:wrapcheck + } + return resp, nil +} + +func (p *externalPluginImpl) getDependencies(ctx context.Context, req *connect.Request[langpb.DependenciesRequest]) (*connect.Response[langpb.DependenciesResponse], error) { + resp, err := p.client.GetDependencies(ctx, req) + if err != nil { + return nil, err //nolint:wrapcheck + } + return resp, nil +} + +func (p *externalPluginImpl) build(ctx context.Context, req *connect.Request[langpb.BuildRequest]) (chan result.Result[*langpb.BuildEvent], streamCancelFunc, error) { + stream, err := p.client.Build(ctx, req) + if err != nil { + return nil, nil, err //nolint:wrapcheck + } + + streamChan := make(chan result.Result[*langpb.BuildEvent], 64) + go streamToChan(stream, streamChan) + + return streamChan, func() { + stream.Close() + close(streamChan) + }, nil +} + +func streamToChan(stream *connect.ServerStreamForClient[langpb.BuildEvent], ch chan result.Result[*langpb.BuildEvent]) { + for stream.Receive() { + ch <- result.From(stream.Msg(), nil) + } + if err := stream.Err(); err != nil { + ch <- result.Err[*langpb.BuildEvent](err) + } + close(ch) +} + +func (p *externalPluginImpl) buildContextUpdated(ctx context.Context, req *connect.Request[langpb.BuildContextUpdatedRequest]) (*connect.Response[langpb.BuildContextUpdatedResponse], error) { + resp, err := p.client.BuildContextUpdated(ctx, req) + if err != nil { + return nil, err //nolint:wrapcheck + } + return resp, nil +} diff --git a/internal/buildengine/languageplugin/external_plugin_test.go b/internal/buildengine/languageplugin/external_plugin_test.go new file mode 100644 index 0000000000..df4e132931 --- /dev/null +++ b/internal/buildengine/languageplugin/external_plugin_test.go @@ -0,0 +1,452 @@ +package languageplugin + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/alecthomas/assert/v2" + "github.com/alecthomas/atomic" + "github.com/alecthomas/kong" + "github.com/alecthomas/types/optional" + "github.com/alecthomas/types/result" + + "connectrpc.com/connect" + langpb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/language" + "github.com/TBD54566975/ftl/internal/builderrors" + "github.com/TBD54566975/ftl/internal/log" + "github.com/TBD54566975/ftl/internal/moduleconfig" + "github.com/TBD54566975/ftl/internal/schema" +) + +type testBuildContext struct { + BuildContext + ContextID string + IsRebuild bool +} + +type mockExternalPluginClient struct { + flags []*langpb.GetCreateModuleFlagsResponse_Flag + + // atomic.Value does not allow us to atomically publish, close and replace the chan + buildEventsLock *sync.Mutex + buildEvents chan result.Result[*langpb.BuildEvent] + + latestBuildContext atomic.Value[testBuildContext] +} + +var _ externalPluginClient = &mockExternalPluginClient{} + +func newMockExternalPlugin() *mockExternalPluginClient { + return &mockExternalPluginClient{ + buildEventsLock: &sync.Mutex{}, + buildEvents: make(chan result.Result[*langpb.BuildEvent], 64), + } +} + +func (p *mockExternalPluginClient) getCreateModuleFlags(context.Context, *connect.Request[langpb.GetCreateModuleFlagsRequest]) (*connect.Response[langpb.GetCreateModuleFlagsResponse], error) { + return connect.NewResponse(&langpb.GetCreateModuleFlagsResponse{ + Flags: p.flags, + }), nil +} + +func (p *mockExternalPluginClient) createModule(context.Context, *connect.Request[langpb.CreateModuleRequest]) (*connect.Response[langpb.CreateModuleResponse], error) { + panic("not implemented") +} + +func (p *mockExternalPluginClient) moduleConfigDefaults(ctx context.Context, req *connect.Request[langpb.ModuleConfigDefaultsRequest]) (*connect.Response[langpb.ModuleConfigDefaultsResponse], error) { + return connect.NewResponse(&langpb.ModuleConfigDefaultsResponse{ + DeployDir: "test-deploy-dir", + Watch: []string{"a", "b", "c"}, + }), nil +} + +func (p *mockExternalPluginClient) getDependencies(context.Context, *connect.Request[langpb.DependenciesRequest]) (*connect.Response[langpb.DependenciesResponse], error) { + panic("not implemented") +} + +func buildContextFromProto(proto *langpb.BuildContext) (BuildContext, error) { + sch, err := schema.FromProto(proto.Schema) + if err != nil { + return BuildContext{}, fmt.Errorf("could not load schema from build context proto: %w", err) + } + return BuildContext{ + Schema: sch, + Dependencies: proto.Dependencies, + Config: moduleconfig.ModuleConfig{ + Dir: proto.ModuleConfig.Path, + Language: "test", + Realm: "test", + Module: proto.ModuleConfig.Name, + Build: optional.Ptr(proto.ModuleConfig.Build).Default(""), + DeployDir: proto.ModuleConfig.DeployDir, + GeneratedSchemaDir: optional.Ptr(proto.ModuleConfig.GeneratedSchemaDir).Default(""), + Watch: proto.ModuleConfig.Watch, + LanguageConfig: proto.ModuleConfig.LanguageConfig.AsMap(), + }, + }, nil +} + +func (p *mockExternalPluginClient) build(ctx context.Context, req *connect.Request[langpb.BuildRequest]) (chan result.Result[*langpb.BuildEvent], streamCancelFunc, error) { + p.buildEventsLock.Lock() + defer p.buildEventsLock.Unlock() + + bctx, err := buildContextFromProto(req.Msg.BuildContext) + if err != nil { + return nil, nil, err + } + p.latestBuildContext.Store(testBuildContext{ + BuildContext: bctx, + ContextID: req.Msg.BuildContext.Id, + IsRebuild: false, + }) + return p.buildEvents, func() {}, nil +} + +func (p *mockExternalPluginClient) buildContextUpdated(ctx context.Context, req *connect.Request[langpb.BuildContextUpdatedRequest]) (*connect.Response[langpb.BuildContextUpdatedResponse], error) { + bctx, err := buildContextFromProto(req.Msg.BuildContext) + if err != nil { + return nil, err + } + p.latestBuildContext.Store(testBuildContext{ + BuildContext: bctx, + ContextID: req.Msg.BuildContext.Id, + IsRebuild: true, + }) + return connect.NewResponse(&langpb.BuildContextUpdatedResponse{}), nil +} + +func (p *mockExternalPluginClient) kill() error { + return nil +} + +func setUp() (context.Context, *externalPlugin, *mockExternalPluginClient, BuildContext) { + ctx := log.ContextWithNewDefaultLogger(context.Background()) + + mockImpl := newMockExternalPlugin() + plugin := newExternalPluginForTesting(ctx, mockImpl) + + bctx := BuildContext{ + Config: moduleconfig.ModuleConfig{ + Module: "name", + Dir: "test/dir", + Language: "test-lang", + }, + Schema: &schema.Schema{}, + Dependencies: []string{}, + } + return ctx, plugin, mockImpl, bctx +} + +func TestCreateModuleFlags(t *testing.T) { + t.Parallel() + for _, tt := range []struct { + protoFlags []*langpb.GetCreateModuleFlagsResponse_Flag + expectedFlags []*kong.Flag + expectedError optional.Option[string] + }{ + { + protoFlags: []*langpb.GetCreateModuleFlagsResponse_Flag{ + { + Name: "full-flag", + Help: "This has all the fields set", + Envar: optional.Some("full-flag").Ptr(), + Short: optional.Some("f").Ptr(), + Placeholder: optional.Some("placeholder").Ptr(), + Default: optional.Some("defaultValue").Ptr(), + }, + { + Name: "sparse-flag", + Help: "This has only the minimum fields set", + }, + }, + expectedFlags: []*kong.Flag{ + { + Value: &kong.Value{ + Name: "full-flag", + Help: "This has all the fields set", + HasDefault: true, + Default: "defaultValue", + Tag: &kong.Tag{ + Envs: []string{ + "full-flag", + }, + }, + }, + PlaceHolder: "placeholder", + Short: 'f', + }, + { + Value: &kong.Value{ + Name: "sparse-flag", + Help: "This has only the minimum fields set", + Tag: &kong.Tag{}, + }, + }, + }, + }, + { + protoFlags: []*langpb.GetCreateModuleFlagsResponse_Flag{ + { + Name: "multi-char-short", + Help: "This has all the fields set", + Short: optional.Some("multi").Ptr(), + }, + }, + expectedError: optional.Some(`invalid flag declared: short flag "multi" for multi-char-short must be a single character`), + }, + { + protoFlags: []*langpb.GetCreateModuleFlagsResponse_Flag{ + { + Name: "dupe-short-1", + Help: "Short must be unique", + Short: optional.Some("d").Ptr(), + }, + { + Name: "dupe-short-2", + Help: "Short must be unique", + Short: optional.Some("d").Ptr(), + }, + }, + expectedError: optional.Some(`multiple flags declared with the same short name: dupe-short-1 and dupe-short-2`), + }, + } { + t.Run(tt.protoFlags[0].Name, func(t *testing.T) { + t.Parallel() + + ctx, plugin, mockImpl, _ := setUp() + mockImpl.flags = tt.protoFlags + kongFlags, err := plugin.GetCreateModuleFlags(ctx) + if expectedError, ok := tt.expectedError.Get(); ok { + assert.Contains(t, err.Error(), expectedError) + return + } + assert.NoError(t, err) + assert.Equal(t, tt.expectedFlags, kongFlags) + }) + } +} + +func TestSimultaneousBuild(t *testing.T) { + t.Parallel() + ctx, plugin, _, bctx := setUp() + _ = beginBuild(ctx, plugin, bctx, false) + r := beginBuild(ctx, plugin, bctx, false) + _, err := (<-r).Result() + assert.EqualError(t, err, "build already in progress") +} + +func TestMismatchedBuildContextID(t *testing.T) { + t.Parallel() + ctx, plugin, mockImpl, bctx := setUp() + + // build + result := beginBuild(ctx, plugin, bctx, false) + + // send mismatched build result (ie: a different build attempt completing) + mockImpl.publishBuildEvent(buildEventWithBuildError("fake", false, "this is not the result you are looking for")) + + // send automatic rebuild result for the same context id (should be ignored) + realID := mockImpl.latestBuildContext.Load().ContextID + mockImpl.publishBuildEvent(buildEventWithBuildError(realID, true, "this is not the result you are looking for")) + + // send real build result + mockImpl.publishBuildEvent(buildEventWithBuildError(realID, false, "this is the correct result")) + + // check result + checkResult(t, <-result, "this is the correct result") +} + +func TestRebuilds(t *testing.T) { + t.Parallel() + ctx, plugin, mockImpl, bctx := setUp() + + // build and activate automatic rebuilds + result := beginBuild(ctx, plugin, bctx, true) + + // send first build result + testBuildCtx := mockImpl.latestBuildContext.Load() + mockImpl.publishBuildEvent(buildEventWithBuildError(testBuildCtx.ContextID, false, "first build")) + + // check result + checkResult(t, <-result, "first build") + + // send rebuild request with updated schema + bctx.Schema.Modules = append(bctx.Schema.Modules, &schema.Module{Name: "another"}) + sch, err := schema.ValidateSchema(bctx.Schema) + assert.NoError(t, err, "schema should be valid") + result = beginBuild(ctx, plugin, bctx, true) + + // send rebuild result + testBuildCtx = mockImpl.latestBuildContext.Load() + assert.Equal(t, testBuildCtx.Schema, sch, "schema should have been updated") + mockImpl.publishBuildEvent(buildEventWithBuildError(testBuildCtx.ContextID, false, "second build")) + + // check rebuild result + checkResult(t, <-result, "second build") +} + +func TestAutomaticRebuilds(t *testing.T) { + t.Parallel() + ctx, plugin, mockImpl, bctx := setUp() + + updates := make(chan PluginEvent, 64) + plugin.Updates().Subscribe(updates) + + // build and activate automatic rebuilds + result := beginBuild(ctx, plugin, bctx, true) + + // plugin sends auto rebuild has started event (should be ignored) + mockImpl.publishBuildEvent(&langpb.BuildEvent{ + Event: &langpb.BuildEvent_AutoRebuildStarted{}, + }) + // plugin sends auto rebuild event (should be ignored) + mockImpl.publishBuildEvent(buildEventWithBuildError("fake", true, "auto rebuild to ignore")) + + // send first build result + time.Sleep(200 * time.Millisecond) + buildCtx := mockImpl.latestBuildContext.Load() + mockImpl.publishBuildEvent(buildEventWithBuildError(buildCtx.ContextID, false, "first build")) + + // check result + checkResult(t, <-result, "first build") + + // confirm that nothing was posted to Updates() (ie: the auto-rebuilds events were ignored) + select { + case <-updates: + t.Fatalf("expected auto rebuilds events to not get published while build is in progress") + case <-time.After(2 * time.Second): + // as expected, no events published plugin + } + + // plugin sends auto rebuild events + mockImpl.publishBuildEvent(&langpb.BuildEvent{ + Event: &langpb.BuildEvent_AutoRebuildStarted{}, + }) + mockImpl.publishBuildEvent(buildEventWithBuildError(buildCtx.ContextID, true, "first real auto rebuild")) + // plugin sends auto rebuild events again (this time with no rebuild started event) + mockImpl.publishBuildEvent(buildEventWithBuildError(buildCtx.ContextID, true, "second real auto rebuild")) + + // confirm that auto rebuilds events were published + events := eventsFromChannel(updates) + assert.Equal(t, len(events), 3, "expected 3 events") + assert.Equal(t, PluginEvent(AutoRebuildStartedEvent{Module: bctx.Config.Module}), events[0]) + checkAutoRebuildResult(t, events[1], "first real auto rebuild") + checkAutoRebuildResult(t, events[2], "second real auto rebuild") +} + +func TestBrokenBuildStream(t *testing.T) { + t.Parallel() + ctx, plugin, mockImpl, bctx := setUp() + + updates := make(chan PluginEvent, 64) + plugin.Updates().Subscribe(updates) + + // build and activate automatic rebuilds + result := beginBuild(ctx, plugin, bctx, true) + + // break the stream + mockImpl.breakStream() + checkStreamError(t, <-result) + + // build again + result = beginBuild(ctx, plugin, bctx, true) + + // send build result + buildCtx := mockImpl.latestBuildContext.Load() + mockImpl.publishBuildEvent(buildEventWithBuildError(buildCtx.ContextID, false, "first build")) + checkResult(t, <-result, "first build") + + // break the stream + mockImpl.breakStream() + + // build again + result = beginBuild(ctx, plugin, bctx, true) + // confirm that a Build call was made instead of a BuildContextUpdated call + assert.False(t, mockImpl.latestBuildContext.Load().IsRebuild, "after breaking the stream, FTL should send a Build call instead of a BuildContextUpdated call") + + // send build result + buildCtx = mockImpl.latestBuildContext.Load() + mockImpl.publishBuildEvent(buildEventWithBuildError(buildCtx.ContextID, false, "second build")) + checkResult(t, <-result, "second build") +} + +func eventsFromChannel(updates chan PluginEvent) []PluginEvent { + // wait a bit to let events get published + time.Sleep(200 * time.Millisecond) + + events := []PluginEvent{} + for { + select { + case e := <-updates: + events = append(events, e) + default: + // no more events available right now + return events + } + } +} + +func buildEventWithBuildError(contextID string, isAutomaticRebuild bool, msg string) *langpb.BuildEvent { + return &langpb.BuildEvent{ + Event: &langpb.BuildEvent_BuildFailure{ + BuildFailure: &langpb.BuildFailure{ + ContextId: contextID, + IsAutomaticRebuild: isAutomaticRebuild, + Errors: langpb.ErrorsToProto([]builderrors.Error{ + { + Msg: msg, + }, + }), + }, + }, + } +} + +func (p *mockExternalPluginClient) publishBuildEvent(event *langpb.BuildEvent) { + p.buildEventsLock.Lock() + defer p.buildEventsLock.Unlock() + + p.buildEvents <- result.From(event, nil) +} + +func beginBuild(ctx context.Context, plugin *externalPlugin, bctx BuildContext, autoRebuild bool) chan result.Result[BuildResult] { + resultChan := make(chan result.Result[BuildResult]) + go func() { + resultChan <- result.From(plugin.Build(ctx, "", bctx, []string{}, autoRebuild)) + }() + // sleep to make sure impl has received the build context + time.Sleep(300 * time.Millisecond) + return resultChan +} + +func (p *mockExternalPluginClient) breakStream() { + p.buildEventsLock.Lock() + defer p.buildEventsLock.Unlock() + p.buildEvents <- result.Err[*langpb.BuildEvent](fmt.Errorf("fake a broken stream")) + close(p.buildEvents) + p.buildEvents = make(chan result.Result[*langpb.BuildEvent], 64) +} + +func checkResult(t *testing.T, r result.Result[BuildResult], expectedMsg string) { + t.Helper() + buildResult, ok := r.Get() + assert.True(t, ok, "expected build result, got %v", r) + assert.Equal(t, len(buildResult.Errors), 1) + assert.Equal(t, buildResult.Errors[0].Msg, expectedMsg) +} + +func checkStreamError(t *testing.T, r result.Result[BuildResult]) { + t.Helper() + _, err := r.Result() + assert.EqualError(t, err, "fake a broken stream") +} + +func checkAutoRebuildResult(t *testing.T, e PluginEvent, expectedMsg string) { + t.Helper() + event, ok := e.(AutoRebuildEndedEvent) + assert.True(t, ok, "expected auto rebuild event, got %v", e) + checkResult(t, event.Result, expectedMsg) +} diff --git a/internal/buildengine/languageplugin/go_plugin.go b/internal/buildengine/languageplugin/go_plugin.go index 5266c5d074..59c7288494 100644 --- a/internal/buildengine/languageplugin/go_plugin.go +++ b/internal/buildengine/languageplugin/go_plugin.go @@ -23,7 +23,6 @@ import ( "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/moduleconfig" "github.com/TBD54566975/ftl/internal/projectconfig" - "github.com/TBD54566975/ftl/internal/schema" "github.com/TBD54566975/ftl/internal/watch" ) @@ -159,8 +158,9 @@ func (p *goPlugin) GetDependencies(ctx context.Context, config moduleconfig.Modu }) } -func buildGo(ctx context.Context, projectRoot string, config moduleconfig.AbsModuleConfig, sch *schema.Schema, buildEnv []string, devMode bool, transaction watch.ModifyFilesTransaction) (BuildResult, error) { - moduleSch, buildErrs, err := compile.Build(ctx, projectRoot, config.Dir, config, sch, transaction, buildEnv, devMode) +func buildGo(ctx context.Context, projectRoot string, bctx BuildContext, buildEnv []string, devMode bool, transaction watch.ModifyFilesTransaction) (BuildResult, error) { + config := bctx.Config.Abs() + moduleSch, buildErrs, err := compile.Build(ctx, projectRoot, config.Dir, config, bctx.Schema, transaction, buildEnv, devMode) if err != nil { return BuildResult{}, CompilerBuildError{err: fmt.Errorf("failed to build module %q: %w", config.Module, err)} } diff --git a/internal/buildengine/languageplugin/java_plugin.go b/internal/buildengine/languageplugin/java_plugin.go index 14de6643ee..1d163c86dc 100644 --- a/internal/buildengine/languageplugin/java_plugin.go +++ b/internal/buildengine/languageplugin/java_plugin.go @@ -14,6 +14,7 @@ import ( "github.com/TBD54566975/scaffolder" "github.com/alecthomas/kong" + "github.com/alecthomas/types/optional" "github.com/beevik/etree" "github.com/go-viper/mapstructure/v2" "golang.org/x/exp/maps" @@ -66,7 +67,7 @@ func newJavaPlugin(ctx context.Context, language string) *javaPlugin { func (p *javaPlugin) ModuleConfigDefaults(ctx context.Context, dir string) (moduleconfig.CustomDefaults, error) { defaults := moduleconfig.CustomDefaults{ - GeneratedSchemaDir: "src/main/ftl-module-schema", + GeneratedSchemaDir: optional.Some("src/main/ftl-module-schema"), // Watch defaults to files related to maven and gradle Watch: []string{"pom.xml", "src/**", "build/generated", "target/generated-sources"}, } @@ -78,13 +79,13 @@ func (p *javaPlugin) ModuleConfigDefaults(ctx context.Context, dir string) (modu defaults.LanguageConfig = map[string]any{ "build-tool": JavaBuildToolMaven, } - defaults.Build = "mvn -B package" + defaults.Build = optional.Some("mvn -B package") defaults.DeployDir = "target" } else if fileExists(buildGradle) || fileExists(buildGradleKts) { defaults.LanguageConfig = map[string]any{ "build-tool": JavaBuildToolGradle, } - defaults.Build = "gradle build" + defaults.Build = optional.Some("gradle build") defaults.DeployDir = "build" } else { return moduleconfig.CustomDefaults{}, fmt.Errorf("could not find JVM build file in %s", dir) @@ -247,9 +248,8 @@ func extractKotlinFTLImports(self, dir string) ([]string, error) { return modules, nil } -func buildJava(ctx context.Context, projectRoot string, config moduleconfig.AbsModuleConfig, sch *schema.Schema, buildEnv []string, devMode bool, transaction watch.ModifyFilesTransaction) (BuildResult, error) { - // TODO: add back - // Deploy: +func buildJava(ctx context.Context, projectRoot string, bctx BuildContext, buildEnv []string, devMode bool, transaction watch.ModifyFilesTransaction) (BuildResult, error) { + config := bctx.Config.Abs() logger := log.FromContext(ctx) javaConfig, err := loadJavaConfig(config.LanguageConfig, config.Language) if err != nil { diff --git a/internal/buildengine/languageplugin/java_plugin_test.go b/internal/buildengine/languageplugin/java_plugin_test.go index f7bcb950e4..8cb8b7272e 100644 --- a/internal/buildengine/languageplugin/java_plugin_test.go +++ b/internal/buildengine/languageplugin/java_plugin_test.go @@ -7,6 +7,7 @@ import ( "github.com/TBD54566975/ftl/internal/moduleconfig" "github.com/alecthomas/assert/v2" + "github.com/alecthomas/types/optional" ) func TestExtractModuleDepsKotlin(t *testing.T) { @@ -32,9 +33,9 @@ func TestJavaConfigDefaults(t *testing.T) { language: "kotlin", dir: "testdata/echokotlin", expected: moduleconfig.CustomDefaults{ - Build: "mvn -B package", + Build: optional.Some("mvn -B package"), DeployDir: "target", - GeneratedSchemaDir: "src/main/ftl-module-schema", + GeneratedSchemaDir: optional.Some("src/main/ftl-module-schema"), Watch: watch, LanguageConfig: map[string]any{ "build-tool": "maven", @@ -45,9 +46,9 @@ func TestJavaConfigDefaults(t *testing.T) { language: "kotlin", dir: "testdata/externalkotlin", expected: moduleconfig.CustomDefaults{ - Build: "mvn -B package", + Build: optional.Some("mvn -B package"), DeployDir: "target", - GeneratedSchemaDir: "src/main/ftl-module-schema", + GeneratedSchemaDir: optional.Some("src/main/ftl-module-schema"), Watch: watch, LanguageConfig: map[string]any{ "build-tool": "maven", diff --git a/internal/buildengine/languageplugin/plugin.go b/internal/buildengine/languageplugin/plugin.go index a3f6e456ac..cfb6944aa1 100644 --- a/internal/buildengine/languageplugin/plugin.go +++ b/internal/buildengine/languageplugin/plugin.go @@ -10,6 +10,7 @@ import ( "github.com/alecthomas/kong" "github.com/alecthomas/types/either" "github.com/alecthomas/types/pubsub" + "github.com/alecthomas/types/result" "github.com/TBD54566975/ftl/internal/bind" "github.com/TBD54566975/ftl/internal/builderrors" @@ -30,6 +31,9 @@ type BuildResult struct { // Files to deploy, relative to the module config's DeployDir Deploy []string + + // Whether the module needs to recalculate its dependencies + InvalidateDependencies bool } // PluginEvent is used to notify of updates from the plugin. @@ -51,12 +55,21 @@ func (e AutoRebuildStartedEvent) ModuleName() string { return e.Module } // AutoRebuildEndedEvent is sent when the plugin ends an automatic rebuild. type AutoRebuildEndedEvent struct { Module string - Result either.Either[BuildResult, error] + Result result.Result[BuildResult] } func (AutoRebuildEndedEvent) pluginEvent() {} func (e AutoRebuildEndedEvent) ModuleName() string { return e.Module } +// BuildContext contains contextual information needed to build. +// +// Any change to the build context would require a new build. +type BuildContext struct { + Config moduleconfig.ModuleConfig + Schema *schema.Schema + Dependencies []string +} + // LanguagePlugin handles building and scaffolding modules in a specific language. type LanguagePlugin interface { // Updates topic for all update events from the plugin @@ -66,10 +79,10 @@ type LanguagePlugin interface { // GetModuleConfigDefaults provides custom defaults for the module config. // // The result may be cached by FTL, so defaulting logic should not be changing due to normal module changes. - // For example it is valid to return defaults based on which build tool is configured within the module directory, + // For example, it is valid to return defaults based on which build tool is configured within the module directory, // as that is not expected to change during normal operation. // It is not recommended to read the module's toml file to determine defaults, as when the toml file is updated, - // the defaults will not be recalculated. + // the module defaults will not be recalculated. ModuleConfigDefaults(ctx context.Context, dir string) (moduleconfig.CustomDefaults, error) // GetCreateModuleFlags returns the flags that can be used to create a module for this language. @@ -79,15 +92,15 @@ type LanguagePlugin interface { CreateModule(ctx context.Context, projConfig projectconfig.Config, moduleConfig moduleconfig.ModuleConfig, flags map[string]string) error // GetDependencies returns the dependencies of the module. - GetDependencies(ctx context.Context, config moduleconfig.ModuleConfig) ([]string, error) + GetDependencies(ctx context.Context, moduleConfig moduleconfig.ModuleConfig) ([]string, error) // Build builds the module with the latest config and schema. // In dev mode, plugin is responsible for automatically rebuilding as relevant files within the module change, // and publishing these automatic builds updates to Updates(). - Build(ctx context.Context, projectRoot string, config moduleconfig.ModuleConfig, sch *schema.Schema, buildEnv []string, devMode bool) (BuildResult, error) + Build(ctx context.Context, projectRoot string, bctx BuildContext, buildEnv []string, rebuildAutomatically bool) (BuildResult, error) // Kill stops the plugin and cleans up any resources. - Kill(ctx context.Context) error + Kill() error } // PluginFromConfig creates a new language plugin from the given config. @@ -100,7 +113,7 @@ func New(ctx context.Context, bindAllocator *bind.BindAllocator, language string case "rust": return newRustPlugin(ctx), nil default: - return p, fmt.Errorf("unknown language %q", language) + return newExternalPlugin(ctx, bindAllocator.Next(), language) } } @@ -110,11 +123,10 @@ type pluginCommand interface { } type buildCommand struct { - projectRoot string - config moduleconfig.ModuleConfig - schema *schema.Schema - buildEnv []string - devMode bool + BuildContext + projectRoot string + buildEnv []string + rebuildAutomatically bool result chan either.Either[BuildResult, error] } @@ -130,7 +142,7 @@ type getDependenciesCommand struct { func (getDependenciesCommand) pluginCmd() {} -type buildFunc = func(ctx context.Context, projectRoot string, config moduleconfig.AbsModuleConfig, sch *schema.Schema, buildEnv []string, devMode bool, transaction watch.ModifyFilesTransaction) (BuildResult, error) +type buildFunc = func(ctx context.Context, projectRoot string, bctx BuildContext, buildEnv []string, rebuildAutomatically bool, transaction watch.ModifyFilesTransaction) (BuildResult, error) type CompilerBuildError struct { err error @@ -175,19 +187,18 @@ func (p *internalPlugin) Updates() *pubsub.Topic[PluginEvent] { return p.updates } -func (p *internalPlugin) Kill(ctx context.Context) error { +func (p *internalPlugin) Kill() error { p.cancel() return nil } -func (p *internalPlugin) Build(ctx context.Context, projectRoot string, config moduleconfig.ModuleConfig, sch *schema.Schema, buildEnv []string, devMode bool) (BuildResult, error) { +func (p *internalPlugin) Build(ctx context.Context, projectRoot string, bctx BuildContext, buildEnv []string, rebuildAutomatically bool) (BuildResult, error) { cmd := buildCommand{ - projectRoot: projectRoot, - config: config, - schema: sch, - buildEnv: buildEnv, - devMode: devMode, - result: make(chan either.Either[BuildResult, error]), + BuildContext: bctx, + projectRoot: projectRoot, + buildEnv: buildEnv, + rebuildAutomatically: rebuildAutomatically, + result: make(chan either.Either[BuildResult, error]), } p.commands <- cmd select { @@ -232,11 +243,10 @@ func (p *internalPlugin) run(ctx context.Context) { // State // This is updated when given explicit build commands and used for automatic rebuilds - var config moduleconfig.ModuleConfig + var bctx BuildContext var projectRoot string - var schema *schema.Schema var buildEnv []string - devMode := false + watching := false for { select { @@ -244,19 +254,18 @@ func (p *internalPlugin) run(ctx context.Context) { switch c := cmd.(type) { case buildCommand: // update state + bctx = c.BuildContext projectRoot = c.projectRoot - config = c.config - schema = c.schema buildEnv = c.buildEnv if watcher == nil { - watcher = watch.NewWatcher(config.Watch...) + watcher = watch.NewWatcher(bctx.Config.Watch...) } // begin watching if needed - if c.devMode && !devMode { - devMode = true - topic, err := watcher.Watch(ctx, time.Second, []string{config.Abs().Dir}) + if c.rebuildAutomatically && !watching { + watching = true + topic, err := watcher.Watch(ctx, time.Second, []string{bctx.Config.Abs().Dir}) if err != nil { c.result <- either.RightOf[BuildResult](fmt.Errorf("failed to start watching: %w", err)) continue @@ -265,7 +274,7 @@ func (p *internalPlugin) run(ctx context.Context) { } // build - result, err := buildAndLoadResult(ctx, projectRoot, config, schema, buildEnv, devMode, watcher, p.buildFunc) + result, err := buildAndLoadResult(ctx, projectRoot, bctx, buildEnv, c.rebuildAutomatically, watcher, p.buildFunc) if err != nil { c.result <- either.RightOf[BuildResult](err) continue @@ -285,18 +294,10 @@ func (p *internalPlugin) run(ctx context.Context) { case watch.WatchEventModuleChanged: // automatic rebuild - p.updates.Publish(AutoRebuildStartedEvent{Module: config.Module}) - result, err := buildAndLoadResult(ctx, projectRoot, config, schema, buildEnv, devMode, watcher, p.buildFunc) - if err != nil { - p.updates.Publish(AutoRebuildEndedEvent{ - Module: config.Module, - Result: either.RightOf[BuildResult](err), - }) - continue - } + p.updates.Publish(AutoRebuildStartedEvent{Module: bctx.Config.Module}) p.updates.Publish(AutoRebuildEndedEvent{ - Module: config.Module, - Result: either.LeftOf[error](result), + Module: bctx.Config.Module, + Result: result.From(buildAndLoadResult(ctx, projectRoot, bctx, buildEnv, true, watcher, p.buildFunc)), }) case watch.WatchEventModuleAdded: // ignore @@ -311,8 +312,8 @@ func (p *internalPlugin) run(ctx context.Context) { } } -func buildAndLoadResult(ctx context.Context, projectRoot string, c moduleconfig.ModuleConfig, sch *schema.Schema, buildEnv []string, devMode bool, watcher *watch.Watcher, build buildFunc) (BuildResult, error) { - config := c.Abs() +func buildAndLoadResult(ctx context.Context, projectRoot string, bctx BuildContext, buildEnv []string, devMode bool, watcher *watch.Watcher, build buildFunc) (BuildResult, error) { + config := bctx.Config.Abs() release, err := flock.Acquire(ctx, filepath.Join(config.Dir, ".ftl.lock"), BuildLockTimeout) if err != nil { return BuildResult{}, fmt.Errorf("could not acquire build lock for %v: %w", config.Module, err) @@ -329,7 +330,7 @@ func buildAndLoadResult(ctx context.Context, projectRoot string, c moduleconfig. } transaction := watcher.GetTransaction(config.Dir) - result, err := build(ctx, projectRoot, config, sch, buildEnv, devMode, transaction) + result, err := build(ctx, projectRoot, bctx, buildEnv, devMode, transaction) if err != nil { return BuildResult{}, err } diff --git a/internal/buildengine/languageplugin/rust_plugin.go b/internal/buildengine/languageplugin/rust_plugin.go index 7bf7d4aca3..1063850ec7 100644 --- a/internal/buildengine/languageplugin/rust_plugin.go +++ b/internal/buildengine/languageplugin/rust_plugin.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/alecthomas/kong" + "github.com/alecthomas/types/optional" "github.com/TBD54566975/ftl/internal/builderrors" "github.com/TBD54566975/ftl/internal/exec" @@ -30,7 +31,7 @@ func newRustPlugin(ctx context.Context) *rustPlugin { func (p *rustPlugin) ModuleConfigDefaults(ctx context.Context, dir string) (moduleconfig.CustomDefaults, error) { return moduleconfig.CustomDefaults{ - Build: "cargo build", + Build: optional.Some("cargo build"), DeployDir: "_ftl/target/debug", Watch: []string{"**/*.rs", "Cargo.toml", "Cargo.lock"}, }, nil @@ -48,7 +49,8 @@ func (p *rustPlugin) GetDependencies(ctx context.Context, config moduleconfig.Mo return nil, fmt.Errorf("not implemented") } -func buildRust(ctx context.Context, projectRoot string, config moduleconfig.AbsModuleConfig, sch *schema.Schema, buildEnv []string, devMode bool, transaction watch.ModifyFilesTransaction) (BuildResult, error) { +func buildRust(ctx context.Context, projectRoot string, bctx BuildContext, buildEnv []string, devMode bool, transaction watch.ModifyFilesTransaction) (BuildResult, error) { + config := bctx.Config.Abs() logger := log.FromContext(ctx) logger.Debugf("Using build command '%s'", config.Build) err := exec.Command(ctx, log.Debug, config.Dir+"/_ftl", "bash", "-c", config.Build).RunBuffered(ctx) diff --git a/internal/moduleconfig/moduleconfig.go b/internal/moduleconfig/moduleconfig.go index ff138f0584..745db87a51 100644 --- a/internal/moduleconfig/moduleconfig.go +++ b/internal/moduleconfig/moduleconfig.go @@ -6,6 +6,7 @@ import ( "strings" "github.com/BurntSushi/toml" + "github.com/alecthomas/types/optional" "github.com/go-viper/mapstructure/v2" "github.com/TBD54566975/ftl/internal/slices" @@ -51,10 +52,10 @@ type AbsModuleConfig ModuleConfig type UnvalidatedModuleConfig ModuleConfig type CustomDefaults struct { - Build string DeployDir string - GeneratedSchemaDir string Watch []string + Build optional.Option[string] + GeneratedSchemaDir optional.Option[string] // only the root keys in LanguageConfig are used to find missing values that can be defaulted LanguageConfig map[string]any `toml:"-"` @@ -138,14 +139,14 @@ func (c UnvalidatedModuleConfig) FillDefaultsAndValidate(customDefaults CustomDe } // Custom defaults - if c.Build == "" { - c.Build = customDefaults.Build + if defaultValue, ok := customDefaults.Build.Get(); ok && c.Build == "" { + c.Build = defaultValue } if c.DeployDir == "" { c.DeployDir = customDefaults.DeployDir } - if c.GeneratedSchemaDir == "" { - c.GeneratedSchemaDir = customDefaults.GeneratedSchemaDir + if defaultValue, ok := customDefaults.GeneratedSchemaDir.Get(); ok && c.GeneratedSchemaDir == "" { + c.GeneratedSchemaDir = defaultValue } if c.Watch == nil { c.Watch = customDefaults.Watch diff --git a/internal/moduleconfig/moduleconfig_test.go b/internal/moduleconfig/moduleconfig_test.go index e9a1b372cf..00fbfd055c 100644 --- a/internal/moduleconfig/moduleconfig_test.go +++ b/internal/moduleconfig/moduleconfig_test.go @@ -4,6 +4,7 @@ import ( "testing" "github.com/alecthomas/assert/v2" + "github.com/alecthomas/types/optional" ) func TestDefaulting(t *testing.T) { @@ -21,9 +22,9 @@ func TestDefaulting(t *testing.T) { Language: "test", }, defaults: CustomDefaults{ - Build: "build", + Build: optional.Some("build"), DeployDir: "deploydir", - GeneratedSchemaDir: "generatedschemadir", + GeneratedSchemaDir: optional.Some("generatedschemadir"), Watch: []string{"a", "b", "c"}, }, expected: ModuleConfig{ @@ -52,9 +53,9 @@ func TestDefaulting(t *testing.T) { }, }, defaults: CustomDefaults{ - Build: "build", + Build: optional.Some("build"), DeployDir: "deploydir", - GeneratedSchemaDir: "generatedschemadir", + GeneratedSchemaDir: optional.Some("generatedschemadir"), Watch: []string{"a", "b", "c"}, }, expected: ModuleConfig{