Skip to content

Commit

Permalink
fix: validate full schema on deploy (#697)
Browse files Browse the repository at this point in the history
This change loads all existing schemas, merges the current deployment's
schema, then validates the entire schema together. This ensures only
valid modules can be deployed to the cluster, but requires deployment in
a specific order.
  • Loading branch information
alecthomas authored Dec 4, 2023
1 parent b237684 commit 1507bc8
Show file tree
Hide file tree
Showing 12 changed files with 184 additions and 116 deletions.
25 changes: 22 additions & 3 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"github.com/TBD54566975/ftl/backend/common/cors"
"github.com/TBD54566975/ftl/backend/common/log"
ftlmaps "github.com/TBD54566975/ftl/backend/common/maps"
"github.com/TBD54566975/ftl/backend/common/model"
"github.com/TBD54566975/ftl/backend/common/rpc"
"github.com/TBD54566975/ftl/backend/common/rpc/headers"
Expand Down Expand Up @@ -376,13 +377,13 @@ func (s *Service) StreamDeploymentLogs(ctx context.Context, stream *connect.Clie
}

func (s *Service) GetSchema(ctx context.Context, c *connect.Request[ftlv1.GetSchemaRequest]) (*connect.Response[ftlv1.GetSchemaResponse], error) {
deployments, err := s.dal.GetActiveDeployments(ctx)
schemas, err := s.dal.GetActiveDeploymentSchemas(ctx)
if err != nil {
return nil, err
}
sch := &schemapb.Schema{
Modules: slices.Map(deployments, func(d dal.Deployment) *schemapb.Module {
return d.Schema.ToProto().(*schemapb.Module) //nolint:forcetypeassert
Modules: slices.Map(schemas, func(d *schema.Module) *schemapb.Module {
return d.ToProto().(*schemapb.Module) //nolint:forcetypeassert
}),
}
return connect.NewResponse(&ftlv1.GetSchemaResponse{Schema: sch}), nil
Expand Down Expand Up @@ -674,11 +675,17 @@ func (s *Service) CreateDeployment(ctx context.Context, req *connect.Request[ftl
logger.Errorf(err, "Missing runtime metadata")
return nil, err
}

module, err := schema.ModuleFromProto(ms)
if err != nil {
logger.Errorf(err, "Invalid module schema")
return nil, fmt.Errorf("%s: %w", "invalid module schema", err)
}
if err := s.validateWholeSchema(ctx, module); err != nil {
logger.Errorf(err, "Invalid module schema")
return nil, fmt.Errorf("%s: %w", "invalid module schema", err)
}

ingressRoutes := extractIngressRoutingEntries(req.Msg)
dname, err := s.dal.CreateDeployment(ctx, ms.Runtime.Language, module, artefacts, ingressRoutes)
if err != nil {
Expand All @@ -690,6 +697,18 @@ func (s *Service) CreateDeployment(ctx context.Context, req *connect.Request[ftl
return connect.NewResponse(&ftlv1.CreateDeploymentResponse{DeploymentName: dname.String()}), nil
}

// Load schemas for existing modules, combine with our new one, and validate as a whole.
func (s *Service) validateWholeSchema(ctx context.Context, module *schema.Module) error {
existingModules, err := s.dal.GetActiveDeploymentSchemas(ctx)
if err != nil {
return fmt.Errorf("%s: %w", "could not get existing schemas", err)
}
schemaMap := ftlmaps.FromSlice(existingModules, func(el *schema.Module) (string, *schema.Module) { return el.Name, el })
schemaMap[module.Name] = module
fullSchema := &schema.Schema{Modules: maps.Values(schemaMap)}
return schema.Validate(fullSchema)
}

func (s *Service) getDeployment(ctx context.Context, name string) (*model.Deployment, error) {
dkey, err := model.ParseDeploymentName(name)
if err != nil {
Expand Down
26 changes: 19 additions & 7 deletions backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func (d *DAL) GetStatus(
if err != nil {
return Status{}, fmt.Errorf("%s: %w", "could not get active runners", translatePGError(err))
}
deployments, err := d.db.GetDeployments(ctx, allDeployments)
deployments, err := d.db.GetActiveDeployments(ctx, allDeployments)
if err != nil {
return Status{}, fmt.Errorf("%s: %w", "could not get active deployments", translatePGError(err))
}
Expand All @@ -266,7 +266,7 @@ func (d *DAL) GetStatus(
if err != nil {
return Status{}, fmt.Errorf("%s: %w", "could not get routing table", translatePGError(err))
}
statusDeployments, err := slices.MapErr(deployments, func(in sql.GetDeploymentsRow) (Deployment, error) {
statusDeployments, err := slices.MapErr(deployments, func(in sql.GetActiveDeploymentsRow) (Deployment, error) {
protoSchema := &schemapb.Module{}
if err := proto.Unmarshal(in.Deployment.Schema, protoSchema); err != nil {
return Deployment{}, fmt.Errorf("%q: could not unmarshal schema: %w", in.ModuleName, err)
Expand Down Expand Up @@ -717,14 +717,14 @@ func (d *DAL) GetDeploymentsNeedingReconciliation(ctx context.Context) ([]Reconc

// GetActiveDeployments returns all active deployments.
func (d *DAL) GetActiveDeployments(ctx context.Context) ([]Deployment, error) {
rows, err := d.db.GetDeployments(ctx, false)
rows, err := d.db.GetActiveDeployments(ctx, false)
if err != nil {
if isNotFound(err) {
return nil, nil
}
return nil, translatePGError(err)
}
deployments, err := slices.MapErr(rows, func(in sql.GetDeploymentsRow) (Deployment, error) {
return slices.MapErr(rows, func(in sql.GetActiveDeploymentsRow) (Deployment, error) {
protoSchema := &schemapb.Module{}
if err := proto.Unmarshal(in.Deployment.Schema, protoSchema); err != nil {
return Deployment{}, fmt.Errorf("%q: could not unmarshal schema: %w", in.ModuleName, err)
Expand All @@ -742,12 +742,24 @@ func (d *DAL) GetActiveDeployments(ctx context.Context) ([]Deployment, error) {
CreatedAt: in.Deployment.CreatedAt,
}, nil
})
}

func (d *DAL) GetActiveDeploymentSchemas(ctx context.Context) ([]*schema.Module, error) {
rows, err := d.db.GetActiveDeploymentSchemas(ctx)
if err != nil {
return nil, err
return nil, fmt.Errorf("%s: %w", "could not get active deployments", translatePGError(err))
}

return deployments, nil
return slices.MapErr(rows, func(in sql.GetActiveDeploymentSchemasRow) (*schema.Module, error) {
protoSchema := &schemapb.Module{}
if err := proto.Unmarshal(in.Schema, protoSchema); err != nil {
return nil, fmt.Errorf("%q: could not unmarshal schema: %w", in.Name, err)
}
modelSchema, err := schema.ModuleFromProto(protoSchema)
if err != nil {
return nil, fmt.Errorf("%q: invalid schema in database: %w", in.Name, err)
}
return modelSchema, nil
})
}

type ProcessRunner struct {
Expand Down
3 changes: 2 additions & 1 deletion backend/controller/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion backend/controller/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,17 @@ WHERE sqlc.arg('all')::bool = true
OR r.state <> 'dead'
ORDER BY r.key;

-- name: GetDeployments :many
-- name: GetActiveDeployments :many
SELECT sqlc.embed(d), m.name AS module_name, m.language
FROM deployments d
INNER JOIN modules m on d.module_id = m.id
WHERE sqlc.arg('all')::bool = true
OR min_replicas > 0
ORDER BY d.name;

-- name: GetActiveDeploymentSchemas :many
SELECT name, schema FROM deployments WHERE min_replicas > 0;

-- name: GetProcessList :many
SELECT d.min_replicas,
d.name AS deployment_name,
Expand Down
119 changes: 74 additions & 45 deletions backend/controller/sql/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 1507bc8

Please sign in to comment.