diff --git a/backend/schema/protobuf_dec.go b/backend/schema/protobuf_dec.go index 347e8518a5..8a8ce05d48 100644 --- a/backend/schema/protobuf_dec.go +++ b/backend/schema/protobuf_dec.go @@ -40,8 +40,8 @@ func declListToSchema(s []*schemapb.Decl) []Decl { func typeToSchema(s *schemapb.Type) Type { switch s := s.Value.(type) { - // case *schemapb.Type_VerbRef: - // return verbRefToSchema(s.VerbRef) + case *schemapb.Type_VerbRef: + return VerbRefFromProto(s.VerbRef) case *schemapb.Type_DataRef: return DataRefFromProto(s.DataRef) case *schemapb.Type_EnumRef: diff --git a/buildengine/engine.go b/buildengine/engine.go index 157e9e2bf6..c954b425bb 100644 --- a/buildengine/engine.go +++ b/buildengine/engine.go @@ -36,6 +36,15 @@ type Engine struct { controllerSchema *xsync.MapOf[string, *schema.Module] schemaChanges *pubsub.Topic[schemaChange] cancel func() + parallelism int +} + +type Option func(o *Engine) + +func Parallelism(n int) Option { + return func(o *Engine) { + o.parallelism = n + } } // New constructs a new [Engine]. @@ -45,7 +54,7 @@ type Engine struct { // pull in missing schemas. // // "dirs" are directories to scan for local modules. -func New(ctx context.Context, client ftlv1connect.ControllerServiceClient, dirs ...string) (*Engine, error) { +func New(ctx context.Context, client ftlv1connect.ControllerServiceClient, dirs []string, options ...Option) (*Engine, error) { ctx = rpc.ContextWithClient(ctx, client) e := &Engine{ client: client, @@ -53,6 +62,10 @@ func New(ctx context.Context, client ftlv1connect.ControllerServiceClient, dirs modules: map[string]Module{}, controllerSchema: xsync.NewMapOf[string, *schema.Module](), schemaChanges: pubsub.New[schemaChange](), + parallelism: runtime.NumCPU(), + } + for _, option := range options { + option(e) } e.controllerSchema.Store("builtin", schema.Builtins()) ctx, cancel := context.WithCancel(ctx) @@ -368,7 +381,7 @@ func (e *Engine) buildWithCallback(ctx context.Context, callback buildCallback, // Collect schemas to be inserted into "built" map for subsequent groups. schemas := make(chan *schema.Module, len(group)) wg, ctx := errgroup.WithContext(ctx) - wg.SetLimit(runtime.NumCPU()) + wg.SetLimit(e.parallelism) for _, name := range group { wg.Go(func() error { if mustBuild[name] { diff --git a/buildengine/engine_test.go b/buildengine/engine_test.go index 0a0e13b45f..dba6a77c09 100644 --- a/buildengine/engine_test.go +++ b/buildengine/engine_test.go @@ -13,7 +13,7 @@ import ( func TestEngine(t *testing.T) { ctx := log.ContextWithNewDefaultLogger(context.Background()) - engine, err := buildengine.New(ctx, nil, "testdata/modules/alpha", "testdata/modules/another") + engine, err := buildengine.New(ctx, nil, []string{"testdata/modules/alpha", "testdata/modules/another"}) assert.NoError(t, err) defer engine.Close() diff --git a/cmd/ftl/cmd_build.go b/cmd/ftl/cmd_build.go index 4b10d5737a..312df166cb 100644 --- a/cmd/ftl/cmd_build.go +++ b/cmd/ftl/cmd_build.go @@ -9,12 +9,13 @@ import ( ) type buildCmd struct { - Dirs []string `arg:"" help:"Base directories containing modules." type:"existingdir" required:""` + Parallelism int `short:"j" help:"Number of modules to build in parallel." default:"${numcpu}"` + Dirs []string `arg:"" help:"Base directories containing modules." type:"existingdir" required:""` } func (b *buildCmd) Run(ctx context.Context) error { client := rpc.ClientFromContext[ftlv1connect.ControllerServiceClient](ctx) - engine, err := buildengine.New(ctx, client, b.Dirs...) + engine, err := buildengine.New(ctx, client, b.Dirs, buildengine.Parallelism(b.Parallelism)) if err != nil { return err } diff --git a/cmd/ftl/cmd_deploy.go b/cmd/ftl/cmd_deploy.go index 2c06f3ae95..8ea47241c3 100644 --- a/cmd/ftl/cmd_deploy.go +++ b/cmd/ftl/cmd_deploy.go @@ -9,14 +9,15 @@ import ( ) type deployCmd struct { - Replicas int32 `short:"n" help:"Number of replicas to deploy." default:"1"` - Dirs []string `arg:"" help:"Base directories containing modules." type:"existingdir" required:""` - NoWait bool `help:"Do not wait for deployment to complete." default:"false"` + Parallelism int `short:"j" help:"Number of modules to build in parallel." default:"${numcpu}"` + Replicas int32 `short:"n" help:"Number of replicas to deploy." default:"1"` + Dirs []string `arg:"" help:"Base directories containing modules." type:"existingdir" required:""` + NoWait bool `help:"Do not wait for deployment to complete." default:"false"` } func (d *deployCmd) Run(ctx context.Context) error { client := rpc.ClientFromContext[ftlv1connect.ControllerServiceClient](ctx) - engine, err := buildengine.New(ctx, client, d.Dirs...) + engine, err := buildengine.New(ctx, client, d.Dirs, buildengine.Parallelism(d.Parallelism)) if err != nil { return err } diff --git a/cmd/ftl/cmd_dev.go b/cmd/ftl/cmd_dev.go index 5189d61d76..ad6c8a7170 100644 --- a/cmd/ftl/cmd_dev.go +++ b/cmd/ftl/cmd_dev.go @@ -13,10 +13,11 @@ import ( ) type devCmd struct { - Dirs []string `arg:"" help:"Base directories containing modules." type:"existingdir" required:""` - Watch time.Duration `help:"Watch template directory at this frequency and regenerate on change." default:"500ms"` - NoServe bool `help:"Do not start the FTL server." default:"false"` - ServeCmd serveCmd `embed:""` + Parallelism int `short:"j" help:"Number of modules to build in parallel." default:"${numcpu}"` + Dirs []string `arg:"" help:"Base directories containing modules." type:"existingdir" required:""` + Watch time.Duration `help:"Watch template directory at this frequency and regenerate on change." default:"500ms"` + NoServe bool `help:"Do not start the FTL server." default:"false"` + ServeCmd serveCmd `embed:""` } func (d *devCmd) Run(ctx context.Context) error { @@ -39,7 +40,7 @@ func (d *devCmd) Run(ctx context.Context) error { } g.Go(func() error { - engine, err := buildengine.New(ctx, client, d.Dirs...) + engine, err := buildengine.New(ctx, client, d.Dirs, buildengine.Parallelism(d.Parallelism)) if err != nil { return err } diff --git a/cmd/ftl/main.go b/cmd/ftl/main.go index 0c23a51b23..e9173663a8 100644 --- a/cmd/ftl/main.go +++ b/cmd/ftl/main.go @@ -6,6 +6,7 @@ import ( "os" "os/signal" "runtime" + "strconv" "strings" "syscall" @@ -63,6 +64,7 @@ func main() { "version": ftl.Version, "os": runtime.GOOS, "arch": runtime.GOARCH, + "numcpu": strconv.Itoa(runtime.NumCPU()), }, )