Skip to content

Commit

Permalink
feat: language plugins require a bind allocator to be provided (#3069)
Browse files Browse the repository at this point in the history
This is a prerequisite for external language plugins. This PR makes sure
`languageplugin.New()` is called with a bind allocator so that external
plugins will soon able to able to acquire a bind url.

---------

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
matt2e and github-actions[bot] authored Oct 10, 2024
1 parent d75804a commit 9300e48
Show file tree
Hide file tree
Showing 16 changed files with 118 additions and 49 deletions.
20 changes: 14 additions & 6 deletions backend/controller/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (
"fmt"

"connectrpc.com/connect"
"github.com/alecthomas/types/optional"

ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect"
"github.com/TBD54566975/ftl/go-runtime/encoding"
"github.com/TBD54566975/ftl/internal/bind"
"github.com/TBD54566975/ftl/internal/configuration"
"github.com/TBD54566975/ftl/internal/configuration/manager"
"github.com/TBD54566975/ftl/internal/configuration/providers"
Expand All @@ -21,19 +23,25 @@ type AdminService struct {
schr SchemaRetriever
cm *manager.Manager[configuration.Configuration]
sm *manager.Manager[configuration.Secrets]
// bindAllocator needs to be set for local client to retrieve schemas from disk using language plugins
bindAllocator optional.Option[*bind.BindAllocator]
}

var _ ftlv1connect.AdminServiceHandler = (*AdminService)(nil)

type SchemaRetriever interface {
GetActiveSchema(ctx context.Context) (*schema.Schema, error)
// BindAllocator is required if the schema is retrieved from disk using language plugins
GetActiveSchema(ctx context.Context, bindAllocator optional.Option[*bind.BindAllocator]) (*schema.Schema, error)
}

func NewAdminService(cm *manager.Manager[configuration.Configuration], sm *manager.Manager[configuration.Secrets], schr SchemaRetriever) *AdminService {
// NewAdminService creates a new AdminService.
// bindAllocator is optional and should be set if a local client is to be used that accesses schema from disk using language plugins.
func NewAdminService(cm *manager.Manager[configuration.Configuration], sm *manager.Manager[configuration.Secrets], schr SchemaRetriever, bindAllocator optional.Option[*bind.BindAllocator]) *AdminService {
return &AdminService{
schr: schr,
cm: cm,
sm: sm,
schr: schr,
cm: cm,
sm: sm,
bindAllocator: bindAllocator,
}
}

Expand Down Expand Up @@ -246,7 +254,7 @@ func (s *AdminService) validateAgainstSchema(ctx context.Context, isSecret bool,
}

// If we can't retrieve an active schema, skip validation.
sch, err := s.schr.GetActiveSchema(ctx)
sch, err := s.schr.GetActiveSchema(ctx, s.bindAllocator)
if err != nil {
logger.Debugf("skipping validation; could not get the active schema: %v", err)
return nil
Expand Down
7 changes: 4 additions & 3 deletions backend/controller/admin/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/alecthomas/types/optional"

ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/internal/bind"
"github.com/TBD54566975/ftl/internal/configuration"
"github.com/TBD54566975/ftl/internal/configuration/manager"
"github.com/TBD54566975/ftl/internal/configuration/providers"
Expand All @@ -35,7 +36,7 @@ func TestAdminService(t *testing.T) {
providers.Inline[configuration.Secrets]{},
})
assert.NoError(t, err)
admin := NewAdminService(cm, sm, &diskSchemaRetriever{})
admin := NewAdminService(cm, sm, &diskSchemaRetriever{}, optional.None[*bind.BindAllocator]())
assert.NotZero(t, admin)

expectedEnvarValue, err := json.MarshalIndent(map[string]string{"bar": "barfoo"}, "", " ")
Expand Down Expand Up @@ -199,7 +200,7 @@ var testSchema = schema.MustValidate(&schema.Schema{
type mockSchemaRetriever struct {
}

func (d *mockSchemaRetriever) GetActiveSchema(ctx context.Context) (*schema.Schema, error) {
func (d *mockSchemaRetriever) GetActiveSchema(ctx context.Context, bindAllocator optional.Option[*bind.BindAllocator]) (*schema.Schema, error) {
return testSchema, nil
}

Expand All @@ -217,7 +218,7 @@ func TestAdminValidation(t *testing.T) {
providers.Inline[configuration.Secrets]{},
})
assert.NoError(t, err)
admin := NewAdminService(cm, sm, &mockSchemaRetriever{})
admin := NewAdminService(cm, sm, &mockSchemaRetriever{}, optional.None[*bind.BindAllocator]())
assert.NotZero(t, admin)

testSetConfig(t, ctx, admin, "batmobile", "color", "Black", "")
Expand Down
15 changes: 10 additions & 5 deletions backend/controller/admin/local_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/alecthomas/types/either"
"github.com/alecthomas/types/optional"

"github.com/TBD54566975/ftl/internal/bind"
"github.com/TBD54566975/ftl/internal/buildengine/languageplugin"
cf "github.com/TBD54566975/ftl/internal/configuration"
"github.com/TBD54566975/ftl/internal/configuration/manager"
Expand All @@ -24,16 +25,20 @@ type localClient struct {
}

type diskSchemaRetriever struct {
// Omit to use the project root as the deploy root.
// Omit to use the project root as the deploy root (used in tests)
deployRoot optional.Option[string]
}

// NewLocalClient creates a admin client that reads and writes from the provided config and secret managers
func NewLocalClient(cm *manager.Manager[cf.Configuration], sm *manager.Manager[cf.Secrets]) Client {
return &localClient{NewAdminService(cm, sm, &diskSchemaRetriever{})}
func NewLocalClient(cm *manager.Manager[cf.Configuration], sm *manager.Manager[cf.Secrets], bindAllocator *bind.BindAllocator) Client {
return &localClient{NewAdminService(cm, sm, &diskSchemaRetriever{}, optional.Some(bindAllocator))}
}

func (s *diskSchemaRetriever) GetActiveSchema(ctx context.Context) (*schema.Schema, error) {
func (s *diskSchemaRetriever) GetActiveSchema(ctx context.Context, bAllocator optional.Option[*bind.BindAllocator]) (*schema.Schema, error) {
bindAllocator, ok := bAllocator.Get()
if !ok {
return nil, fmt.Errorf("no bind allocator available")
}
path, ok := projectconfig.DefaultConfigPath().Get()
if !ok {
return nil, fmt.Errorf("no project config path available")
Expand All @@ -53,7 +58,7 @@ func (s *diskSchemaRetriever) GetActiveSchema(ctx context.Context) (*schema.Sche
for _, m := range modules {
go func() {
// Loading a plugin can be expensive. Is there a better way?
plugin, err := languageplugin.New(ctx, m.Language)
plugin, err := languageplugin.New(ctx, bindAllocator, m.Language)
if err != nil {
moduleSchemas <- either.RightOf[*schema.Module](fmt.Errorf("could not load plugin for %s: %w", m.Module, err))
}
Expand Down
24 changes: 18 additions & 6 deletions backend/controller/admin/local_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,41 @@ package admin

import (
"context"
"net/url"
"testing"

"github.com/alecthomas/assert/v2"
"github.com/alecthomas/types/optional"

"github.com/TBD54566975/ftl/internal/bind"
cf "github.com/TBD54566975/ftl/internal/configuration"
"github.com/TBD54566975/ftl/internal/configuration/manager"
"github.com/TBD54566975/ftl/internal/configuration/providers"
"github.com/TBD54566975/ftl/internal/configuration/routers"
in "github.com/TBD54566975/ftl/internal/integration"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/schema"
)

func getDiskSchema(t testing.TB, ctx context.Context) (*schema.Schema, error) {
t.Helper()

bindURL, err := url.Parse("http://127.0.0.1:8893")
assert.NoError(t, err)
bindAllocator, err := bind.NewBindAllocator(bindURL)
assert.NoError(t, err)
dsr := &diskSchemaRetriever{}
return dsr.GetActiveSchema(ctx, optional.Some(bindAllocator))
}

func TestDiskSchemaRetrieverWithBuildArtefact(t *testing.T) {
in.Run(t,
in.WithFTLConfig("ftl-project-dr.toml"),
in.WithoutController(),
in.CopyModule("dischema"),
in.Build("dischema"),
func(t testing.TB, ic in.TestContext) {
dsr := &diskSchemaRetriever{deployRoot: optional.Some[string](ic.WorkingDir())}
sch, err := dsr.GetActiveSchema(ic.Context)
sch, err := getDiskSchema(t, ic.Context)
assert.NoError(t, err)

module, ok := sch.Module("dischema").Get()
Expand All @@ -41,8 +54,7 @@ func TestDiskSchemaRetrieverWithNoSchema(t *testing.T) {
in.WithoutController(),
in.CopyModule("dischema"),
func(t testing.TB, ic in.TestContext) {
dsr := &diskSchemaRetriever{}
_, err := dsr.GetActiveSchema(ic.Context)
_, err := getDiskSchema(t, ic.Context)
assert.Error(t, err)
},
)
Expand All @@ -64,10 +76,10 @@ func TestAdminNoValidationWithNoSchema(t *testing.T) {
assert.NoError(t, err)

dsr := &diskSchemaRetriever{deployRoot: optional.Some(string(t.TempDir()))}
_, err = dsr.GetActiveSchema(ctx)
_, err = dsr.GetActiveSchema(ctx, optional.None[*bind.BindAllocator]())
assert.Error(t, err)

admin := NewAdminService(cm, sm, dsr)
admin := NewAdminService(cm, sm, dsr, optional.None[*bind.BindAllocator]())
testSetConfig(t, ctx, admin, "batmobile", "color", "Red", "")
testSetSecret(t, ctx, admin, "batmobile", "owner", 99, "")
}
3 changes: 2 additions & 1 deletion backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import (
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema"
frontend "github.com/TBD54566975/ftl/frontend/console"
"github.com/TBD54566975/ftl/internal/bind"
cf "github.com/TBD54566975/ftl/internal/configuration/manager"
"github.com/TBD54566975/ftl/internal/cors"
ftlhttp "github.com/TBD54566975/ftl/internal/http"
Expand Down Expand Up @@ -161,7 +162,7 @@ func Start(ctx context.Context, config Config, runnerScaling scaling.RunnerScali
cm := cf.ConfigFromContext(ctx)
sm := cf.SecretsFromContext(ctx)

admin := admin.NewAdminService(cm, sm, svc.dal)
admin := admin.NewAdminService(cm, sm, svc.dal, optional.None[*bind.BindAllocator]())
console := console.NewService(svc.dal, svc.timeline)

ingressHandler := otelhttp.NewHandler(http.Handler(svc), "ftl.ingress")
Expand Down
3 changes: 2 additions & 1 deletion backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/TBD54566975/ftl/backend/controller/pubsub"
"github.com/TBD54566975/ftl/backend/controller/sql/sqltypes"
"github.com/TBD54566975/ftl/backend/libdal"
"github.com/TBD54566975/ftl/internal/bind"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/maps"
"github.com/TBD54566975/ftl/internal/model"
Expand Down Expand Up @@ -573,7 +574,7 @@ func (d *DAL) GetActiveDeployments(ctx context.Context) ([]dalmodel.Deployment,
}

// GetActiveSchema returns the schema for all active deployments.
func (d *DAL) GetActiveSchema(ctx context.Context) (*schema.Schema, error) {
func (d *DAL) GetActiveSchema(ctx context.Context, bindAllocator optional.Option[*bind.BindAllocator]) (*schema.Schema, error) {
deployments, err := d.GetActiveDeployments(ctx)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion bin/hermit.hcl
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
env = {
"DBMATE_MIGRATIONS_DIR": "${HERMIT_ENV}/backend/controller/sql/schema",
"DBMATE_NO_DUMP_SCHEMA": "true",
"FTL_ENDPOINT": "http://localhost:8892",
"FTL_ENDPOINT": "http://127.0.0.1:8892",
"FTL_INIT_GO_REPLACE": "github.com/TBD54566975/ftl=${HERMIT_ENV}",
"FTL_SOURCE": "${HERMIT_ENV}",
"OTEL_GRPC_PORT": "4317",
Expand Down
10 changes: 9 additions & 1 deletion frontend/cli/cmd_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/TBD54566975/ftl/backend/controller/admin"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect"
"github.com/TBD54566975/ftl/internal/bind"
"github.com/TBD54566975/ftl/internal/configuration"
"github.com/TBD54566975/ftl/internal/configuration/manager"
"github.com/TBD54566975/ftl/internal/configuration/routers"
Expand Down Expand Up @@ -87,7 +88,14 @@ func setUpAdminClient(ctx context.Context, config projectconfig.Config) (ctxOut
}
ctx = manager.ContextWithSecrets(ctx, sm)

return ctx, admin.NewLocalClient(cm, sm), nil
// use the cli endpoint to create the bind allocator, but leave the first port unused as it is meant to be reserved by a controller
bindAllocator, err := bind.NewBindAllocator(cli.Endpoint)
if err != nil {
return ctx, client, fmt.Errorf("could not create bind allocator: %w", err)
}
_ = bindAllocator.Next()

return ctx, admin.NewLocalClient(cm, sm, bindAllocator), nil
}
return ctx, adminServiceClient, nil
}
Expand Down
40 changes: 26 additions & 14 deletions frontend/cli/cmd_new.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,18 @@ import (
"context"
"fmt"
"go/token"
"net/url"
"os"
"path/filepath"
"reflect"
"regexp"
"strings"

"github.com/alecthomas/kong"
"github.com/alecthomas/types/optional"

"github.com/TBD54566975/ftl/internal"
"github.com/TBD54566975/ftl/internal/bind"
"github.com/TBD54566975/ftl/internal/buildengine/languageplugin"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/moduleconfig"
Expand All @@ -32,33 +35,46 @@ type newCmd struct {
// - help text (ftl new go --help)
// - default values
// - environment variable overrides
func prepareNewCmd(ctx context.Context, k *kong.Kong, args []string) error {
func prepareNewCmd(ctx context.Context, k *kong.Kong, args []string) (optionalPlugin optional.Option[languageplugin.LanguagePlugin], err error) {
if len(args) < 2 {
return nil
return optionalPlugin, nil
} else if args[0] != "new" {
return nil
return optionalPlugin, nil
}

language := args[1]
// Default to `new` command handler if no language is provided, or option is specified on `new` command.
if len(language) == 0 || language[0] == '-' {
return nil
return optionalPlugin, nil
}

newCmdNode, ok := slices.Find(k.Model.Children, func(n *kong.Node) bool {
return n.Name == "new"
})
if !ok {
return fmt.Errorf("could not find new command")
return optionalPlugin, fmt.Errorf("could not find new command")
}

// Too early to use any kong args here so we can't use cli.Endpoint.
// Hardcoding the default bind URL for now.
pluginBind, err := url.Parse("http://127.0.0.1:8893")
if err != nil {
return optionalPlugin, fmt.Errorf("could not parse default bind URL: %w", err)
}
var bindAllocator *bind.BindAllocator
bindAllocator, err = bind.NewBindAllocator(pluginBind)
if err != nil {
return optionalPlugin, fmt.Errorf("could not create bind allocator: %w", err)
}

plugin, err := languageplugin.New(ctx, language)
plugin, err := languageplugin.New(ctx, bindAllocator, language)
if err != nil {
return fmt.Errorf("could not create plugin for %v: %w", language, err)
return optionalPlugin, fmt.Errorf("could not create plugin for %v: %w", language, err)
}

flags, err := plugin.GetCreateModuleFlags(ctx)
if err != nil {
return fmt.Errorf("could not get CLI flags for %v plugin: %w", language, err)
return optionalPlugin, fmt.Errorf("could not get CLI flags for %v plugin: %w", language, err)
}

registry := kong.NewRegistry().RegisterDefaults()
Expand All @@ -73,10 +89,10 @@ func prepareNewCmd(ctx context.Context, k *kong.Kong, args []string) error {
}
}
newCmdNode.Flags = append(newCmdNode.Flags, flags...)
return nil
return optional.Some(plugin), nil
}

func (i newCmd) Run(ctx context.Context, ktctx *kong.Context, config projectconfig.Config) error {
func (i newCmd) Run(ctx context.Context, ktctx *kong.Context, config projectconfig.Config, plugin languageplugin.LanguagePlugin) error {
name, path, err := validateModule(i.Dir, i.Name)
if err != nil {
return err
Expand Down Expand Up @@ -105,10 +121,6 @@ func (i newCmd) Run(ctx context.Context, ktctx *kong.Context, config projectconf
flags[f.Name] = flagValue
}

plugin, err := languageplugin.New(ctx, i.Language)
if err != nil {
return err
}
err = plugin.CreateModule(ctx, config, moduleConfig, flags)
if err != nil {
return err
Expand Down
Loading

0 comments on commit 9300e48

Please sign in to comment.