Skip to content

Commit

Permalink
chore: remove deployment notifications (#3699)
Browse files Browse the repository at this point in the history
Follow on from #3697
  • Loading branch information
stuartwdouglas authored Dec 10, 2024
1 parent 4b3cf01 commit 6e7490b
Show file tree
Hide file tree
Showing 9 changed files with 153 additions and 423 deletions.
94 changes: 41 additions & 53 deletions backend/controller/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/TBD54566975/ftl/backend/controller/admin"
"github.com/TBD54566975/ftl/backend/controller/dal"
dalmodel "github.com/TBD54566975/ftl/backend/controller/dal/model"
pbconsole "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/console/v1"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/console/v1/pbconsoleconnect"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/schema/v1"
Expand All @@ -19,21 +18,23 @@ import (
"github.com/TBD54566975/ftl/backend/timeline"
"github.com/TBD54566975/ftl/internal/buildengine"
"github.com/TBD54566975/ftl/internal/schema"
"github.com/TBD54566975/ftl/internal/slices"
"github.com/TBD54566975/ftl/internal/schema/schemaeventsource"
)

type ConsoleService struct {
dal *dal.DAL
admin *admin.AdminService
dal *dal.DAL
admin *admin.AdminService
schemaEventSource schemaeventsource.EventSource
}

var _ pbconsoleconnect.ConsoleServiceHandler = (*ConsoleService)(nil)
var _ timelinev1connect.TimelineServiceHandler = (*ConsoleService)(nil)

func NewService(dal *dal.DAL, admin *admin.AdminService) *ConsoleService {
func NewService(dal *dal.DAL, admin *admin.AdminService, schemaEventSource schemaeventsource.EventSource) *ConsoleService {
return &ConsoleService{
dal: dal,
admin: admin,
dal: dal,
admin: admin,
schemaEventSource: schemaEventSource,
}
}

Expand Down Expand Up @@ -77,57 +78,50 @@ func verbSchemaString(sch *schema.Schema, verb *schema.Verb) (string, error) {
}

func (c *ConsoleService) GetModules(ctx context.Context, req *connect.Request[pbconsole.GetModulesRequest]) (*connect.Response[pbconsole.GetModulesResponse], error) {
deployments, err := c.dal.GetActiveDeployments()
if err != nil {
return nil, err
}

sch := &schema.Schema{
Modules: slices.Map(deployments, func(d dalmodel.Deployment) *schema.Module {
return d.Schema
}),
}
sch.Modules = append(sch.Modules, schema.Builtins())
sch := c.schemaEventSource.View()

nilMap := map[schema.RefKey]map[schema.RefKey]bool{}
var modules []*pbconsole.Module
for _, deployment := range deployments {
for _, mod := range sch.Modules {
if mod.Runtime == nil || mod.Runtime.Deployment == nil {
continue
}
var verbs []*pbconsole.Verb
var data []*pbconsole.Data
var secrets []*pbconsole.Secret
var configs []*pbconsole.Config

for _, decl := range deployment.Schema.Decls {
for _, decl := range mod.Decls {
switch decl := decl.(type) {
case *schema.Verb:
verb, err := verbFromDecl(decl, sch, deployment.Module, nilMap)
verb, err := verbFromDecl(decl, sch, mod.Name, nilMap)
if err != nil {
return nil, err
}
verbs = append(verbs, verb)

case *schema.Data:
data = append(data, dataFromDecl(decl, deployment.Module, nilMap))
data = append(data, dataFromDecl(decl, mod.Name, nilMap))

case *schema.Secret:
secrets = append(secrets, secretFromDecl(decl, deployment.Module, nilMap))
secrets = append(secrets, secretFromDecl(decl, mod.Name, nilMap))

case *schema.Config:
configs = append(configs, configFromDecl(decl, deployment.Module, nilMap))
configs = append(configs, configFromDecl(decl, mod.Name, nilMap))

case *schema.Database, *schema.Enum, *schema.TypeAlias, *schema.Topic:
}
}

modules = append(modules, &pbconsole.Module{
Name: deployment.Module,
DeploymentKey: deployment.Key.String(),
Language: deployment.Language,
Name: mod.Name,
DeploymentKey: mod.Runtime.Deployment.DeploymentKey,
Language: mod.Runtime.Base.Language,
Verbs: verbs,
Data: data,
Secrets: secrets,
Configs: configs,
Schema: deployment.Schema.String(),
Schema: mod.String(),
})
}

Expand All @@ -151,16 +145,16 @@ func (c *ConsoleService) GetModules(ctx context.Context, req *connect.Request[pb
}), nil
}

func moduleFromDeployment(deployment dalmodel.Deployment, sch *schema.Schema, refMap map[schema.RefKey]map[schema.RefKey]bool) (*pbconsole.Module, error) {
module, err := moduleFromDecls(deployment.Schema.Decls, sch, deployment.Module, refMap)
func moduleFromDeployment(deployment *schema.Module, sch *schema.Schema, refMap map[schema.RefKey]map[schema.RefKey]bool) (*pbconsole.Module, error) {
module, err := moduleFromDecls(deployment.Decls, sch, deployment.Name, refMap)
if err != nil {
return nil, err
}

module.Name = deployment.Module
module.DeploymentKey = deployment.Key.String()
module.Language = deployment.Language
module.Schema = deployment.Schema.String()
module.Name = deployment.Name
module.DeploymentKey = deployment.Runtime.Deployment.DeploymentKey
module.Language = deployment.Runtime.Base.Language
module.Schema = deployment.String()

return module, nil
}
Expand Down Expand Up @@ -325,11 +319,6 @@ func getReferencesFromMap(refMap map[schema.RefKey]map[schema.RefKey]bool, modul
}

func (c *ConsoleService) StreamModules(ctx context.Context, req *connect.Request[pbconsole.StreamModulesRequest], stream *connect.ServerStream[pbconsole.StreamModulesResponse]) error {
deploymentChanges := make(chan dal.DeploymentNotification, 32)

// Subscribe to deployment changes.
c.dal.DeploymentChanges.Subscribe(deploymentChanges)
defer c.dal.DeploymentChanges.Unsubscribe(deploymentChanges)

err := c.sendStreamModulesResp(ctx, stream)
if err != nil {
Expand All @@ -341,7 +330,7 @@ func (c *ConsoleService) StreamModules(ctx context.Context, req *connect.Request
case <-ctx.Done():
return nil

case <-deploymentChanges:
case <-c.schemaEventSource.Events():
err = c.sendStreamModulesResp(ctx, stream)
if err != nil {
return err
Expand All @@ -352,17 +341,17 @@ func (c *ConsoleService) StreamModules(ctx context.Context, req *connect.Request

// filterDeployments removes any duplicate modules by selecting the deployment with the
// latest CreatedAt.
func (c *ConsoleService) filterDeployments(unfilteredDeployments []dalmodel.Deployment) []dalmodel.Deployment {
latest := make(map[string]dalmodel.Deployment)
func (c *ConsoleService) filterDeployments(unfilteredDeployments *schema.Schema) []*schema.Module {
latest := make(map[string]*schema.Module)

for _, deployment := range unfilteredDeployments {
if existing, found := latest[deployment.Module]; !found || deployment.CreatedAt.After(existing.CreatedAt) {
latest[deployment.Module] = deployment
for _, deployment := range unfilteredDeployments.Modules {
if existing, found := latest[deployment.Name]; !found || deployment.Runtime.Base.CreateTime.After(existing.Runtime.Base.CreateTime) {
latest[deployment.Name] = deployment

}
}

var result []dalmodel.Deployment
var result []*schema.Module
for _, value := range latest {
result = append(result, value)
}
Expand All @@ -371,15 +360,11 @@ func (c *ConsoleService) filterDeployments(unfilteredDeployments []dalmodel.Depl
}

func (c *ConsoleService) sendStreamModulesResp(ctx context.Context, stream *connect.ServerStream[pbconsole.StreamModulesResponse]) error {
unfilteredDeployments, err := c.dal.GetActiveDeployments()
if err != nil {
return fmt.Errorf("failed to get deployments: %w", err)
}
unfilteredDeployments := c.schemaEventSource.View()

deployments := c.filterDeployments(unfilteredDeployments)
sch := &schema.Schema{
Modules: slices.Map(deployments, func(d dalmodel.Deployment) *schema.Module {
return d.Schema
}),
Modules: deployments,
}
builtin := schema.Builtins()
sch.Modules = append(sch.Modules, builtin)
Expand All @@ -406,6 +391,9 @@ func (c *ConsoleService) sendStreamModulesResp(ctx context.Context, stream *conn

var modules []*pbconsole.Module
for _, deployment := range deployments {
if deployment.Runtime == nil || deployment.Runtime.Deployment == nil {
continue
}
module, err := moduleFromDeployment(deployment, sch, refMap)
if err != nil {
return err
Expand Down
Loading

0 comments on commit 6e7490b

Please sign in to comment.