Skip to content

Commit

Permalink
feat: add module update events
Browse files Browse the repository at this point in the history
This pattern allows us to update the runtime metadata for a module
through the SchemaService.
  • Loading branch information
alecthomas committed Dec 3, 2024
1 parent 7e46b4c commit 7089658
Show file tree
Hide file tree
Showing 26 changed files with 1,519 additions and 674 deletions.
5 changes: 3 additions & 2 deletions Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ PROTOS_OUT := "backend/protos/xyz/block/ftl/console/v1/console.pb.go " + \
CONSOLE_ROOT + "/src/protos/xyz/block/ftl/v1/ftl_pb.ts " + \
CONSOLE_ROOT + "/src/protos/xyz/block/ftl/schema/v1/schema_pb.ts " + \
CONSOLE_ROOT + "/src/protos/xyz/block/ftl/publish/v1/publish_pb.ts"
GO_SCHEMA_ROOTS := "./internal/schema.Schema ./internal/schema.ModuleRuntimeEvent"
# Configuration for building Docker images
DOCKER_IMAGES := '''
{
Expand Down Expand Up @@ -196,10 +197,10 @@ go2proto:
@mk "{{SCHEMA_OUT}}" : cmd/go2proto internal/schema -- go2proto -o "{{SCHEMA_OUT}}" \
-O 'go_package="github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/schema/v1;schemapb"' \
-O 'java_multiple_files=true' \
xyz.block.ftl.schema.v1 ./internal/schema.Schema && buf format -w && buf lint
xyz.block.ftl.schema.v1 {{GO_SCHEMA_ROOTS}} && buf format -w && buf lint

# Unconditionally rebuild protos
build-protos-unconditionally: lint-protos pnpm-install go2proto
build-protos-unconditionally: go2proto lint-protos pnpm-install
cd backend/protos && buf generate

# Run integration test(s)
Expand Down
42 changes: 34 additions & 8 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/TBD54566975/ftl"
"github.com/TBD54566975/ftl/backend/controller/admin"
Expand Down Expand Up @@ -490,6 +489,33 @@ func (s *Service) PullSchema(ctx context.Context, req *connect.Request[ftlv1.Pul
})
}

func (s *Service) UpdateModuleRuntime(ctx context.Context, req *connect.Request[ftlv1.UpdateModuleRuntimeRequest]) (*connect.Response[ftlv1.UpdateModuleRuntimeResponse], error) {
schemas, err := s.dal.GetActiveDeploymentSchemasByDeploymentKey(ctx)
if err != nil {
return nil, fmt.Errorf("could not get schemas: %w", err)
}
for deployment, module := range schemas {
if module.Name != req.Msg.Module {
continue
}
key, err := model.ParseDeploymentKey(deployment)
if err != nil {
return nil, fmt.Errorf("invalid deployment key: %w", err)
}
if module.Runtime == nil {
module.Runtime = &schema.ModuleRuntime{}
}
event := schema.ModuleRuntimeEventFromProto(req.Msg.Event)
module.Runtime.ApplyEvent(event)
err = s.dal.UpdateModuleSchema(ctx, key, module)
if err != nil {
return nil, fmt.Errorf("could not update schema for module %s: %w", module.Name, err)
}
break
}
return connect.NewResponse(&ftlv1.UpdateModuleRuntimeResponse{}), nil
}

func (s *Service) UpdateDeploy(ctx context.Context, req *connect.Request[ftlv1.UpdateDeployRequest]) (response *connect.Response[ftlv1.UpdateDeployResponse], err error) {
deploymentKey, err := model.ParseDeploymentKey(req.Msg.DeploymentKey)
if err != nil {
Expand Down Expand Up @@ -1549,15 +1575,15 @@ func (s *Service) watchModuleChanges(ctx context.Context, sendChange func(respon
delete(mostRecentDeploymentByModule, name)
}
} else if message, ok := notification.Message.Get(); ok {
moduleSchema := message.Schema.ToProto().(*schemapb.Module) //nolint:forcetypeassert
if moduleSchema.Runtime == nil {
moduleSchema.Runtime = &schemapb.ModuleRuntime{
Language: message.Language,
}
if message.Schema.Runtime == nil {
message.Schema.Runtime = &schema.ModuleRuntime{}
}
moduleSchema.Runtime.CreateTime = timestamppb.New(message.CreatedAt)
moduleSchema.Runtime.MinReplicas = int32(message.MinReplicas)
message.Schema.Runtime.Scaling = &schema.ModuleRuntimeScaling{
MinReplicas: int32(message.MinReplicas),
}
message.Schema.Runtime.CreateTime = message.CreatedAt

moduleSchema := message.Schema.ToProto().(*schemapb.Module) //nolint:forcetypeassert
hasher := sha.New()
data := []byte(moduleSchema.String())
if _, err := hasher.Write(data); err != nil {
Expand Down
32 changes: 25 additions & 7 deletions backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/alecthomas/types/optional"
inprocesspubsub "github.com/alecthomas/types/pubsub"
xmaps "golang.org/x/exp/maps"
"google.golang.org/protobuf/proto"

aregistry "github.com/TBD54566975/ftl/backend/controller/artefacts"
dalsql "github.com/TBD54566975/ftl/backend/controller/dal/internal/sql"
Expand Down Expand Up @@ -209,11 +208,6 @@ func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchem
return in.Digest, in
})

schemaBytes, err := proto.Marshal(moduleSchema.ToProto())
if err != nil {
return model.DeploymentKey{}, fmt.Errorf("failed to marshal schema: %w", err)
}

// TODO(aat): "schema" containing language?
_, err = tx.db.UpsertModule(ctx, language, moduleSchema.Name)
if err != nil {
Expand All @@ -240,7 +234,7 @@ func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchem
deploymentKey := model.NewDeploymentKey(moduleSchema.Name)

// Create the deployment
err = tx.db.CreateDeployment(ctx, moduleSchema.Name, schemaBytes, deploymentKey)
err = tx.db.CreateDeployment(ctx, moduleSchema.Name, moduleSchema, deploymentKey)
if err != nil {
return model.DeploymentKey{}, fmt.Errorf("failed to create deployment: %w", libdal.TranslatePGError(err))
}
Expand Down Expand Up @@ -565,6 +559,17 @@ func (d *DAL) GetActiveSchema(ctx context.Context) (*schema.Schema, error) {
return sch, nil
}

// UpdateModuleSchema updates the schema for a deployment in place.
//
// Note that this is racey as the deployment can be updated by another process. This will go away once we ditch the DB.
func (d *DAL) UpdateModuleSchema(ctx context.Context, deployment model.DeploymentKey, module *schema.Module) error {
err := d.db.UpdateDeploymentSchema(ctx, module, deployment)
if err != nil {
return fmt.Errorf("failed to update deployment schema: %w", err)
}
return nil
}

func (d *DAL) GetDeploymentsWithMinReplicas(ctx context.Context) ([]dalmodel.Deployment, error) {
rows, err := d.db.GetDeploymentsWithMinReplicas(ctx)
if err != nil {
Expand Down Expand Up @@ -593,6 +598,19 @@ func (d *DAL) GetActiveDeploymentSchemas(ctx context.Context) ([]*schema.Module,
return slices.Map(rows, func(in dalsql.GetActiveDeploymentSchemasRow) *schema.Module { return in.Schema }), nil
}

// GetActiveDeploymentSchemasByDeploymentKey returns the schema for all active deployments by deployment key.
//
// model.DeploymentKey is not used directly as a key as it's not a valid map key.
func (d *DAL) GetActiveDeploymentSchemasByDeploymentKey(ctx context.Context) (map[string]*schema.Module, error) {
rows, err := d.db.GetActiveDeploymentSchemas(ctx)
if err != nil {
return nil, fmt.Errorf("could not get active deployments: %w", libdal.TranslatePGError(err))
}
return maps.FromSlice(rows, func(in dalsql.GetActiveDeploymentSchemasRow) (string, *schema.Module) {
return in.Key.String(), in.Schema
}), nil
}

type ProcessRunner struct {
Key model.RunnerKey
Endpoint string
Expand Down
6 changes: 5 additions & 1 deletion backend/controller/dal/internal/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 10 additions & 1 deletion backend/controller/dal/internal/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,16 @@ WHERE id = ANY (@ids::BIGINT[]);

-- name: CreateDeployment :exec
INSERT INTO deployments (module_id, "schema", "key")
VALUES ((SELECT id FROM modules WHERE name = @module_name::TEXT LIMIT 1), @schema::BYTEA, @key::deployment_key);
VALUES ((SELECT id FROM modules WHERE name = @module_name::TEXT LIMIT 1), @schema::module_schema_pb, @key::deployment_key);

-- Note that this can result in a race condition if the deployment is being updated by another process. This will go
-- away once we ditch the DB.
--
-- name: UpdateDeploymentSchema :exec
UPDATE deployments
SET schema = @schema::module_schema_pb
WHERE key = @key::deployment_key
RETURNING 1;

-- name: GetArtefactDigests :many
-- Return the digests that exist in the database.
Expand Down
22 changes: 19 additions & 3 deletions backend/controller/dal/internal/sql/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 6 additions & 8 deletions backend/controller/dal/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package dal

import (
"context"
"crypto/sha256"
"errors"
"fmt"
"time"
Expand All @@ -13,26 +12,25 @@ import (
dalmodel "github.com/TBD54566975/ftl/backend/controller/dal/model"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/sha256"
)

// DeploymentNotification is a notification from the database when a deployment changes.
type DeploymentNotification = dalmodel.Notification[dalmodel.Deployment, model.DeploymentKey, *model.DeploymentKey]

type deploymentState struct {
Key model.DeploymentKey
schemaHash []byte
schemaHash sha256.SHA256
minReplicas int
}

func deploymentStateFromDeployment(deployment dalmodel.Deployment) (deploymentState, error) {
hasher := sha256.New()
data := []byte(deployment.Schema.String())
if _, err := hasher.Write(data); err != nil {
h, err := deployment.Schema.Hash()
if err != nil {
return deploymentState{}, fmt.Errorf("failed to hash schema: %w", err)
}

return deploymentState{
schemaHash: hasher.Sum(nil),
schemaHash: h,
minReplicas: deployment.MinReplicas,
Key: deployment.Key,
}, nil
Expand Down Expand Up @@ -76,7 +74,7 @@ func (d *DAL) PollDeployments(ctx context.Context) {
d.DeploymentChanges.Publish(DeploymentNotification{
Message: optional.Some(deployment),
})
} else if previousState.minReplicas != state.minReplicas {
} else if previousState.schemaHash != state.schemaHash {
logger.Tracef("Changed deployment: %s", name)
d.DeploymentChanges.Publish(DeploymentNotification{
Message: optional.Some(deployment),
Expand Down
Loading

0 comments on commit 7089658

Please sign in to comment.