Skip to content

Commit

Permalink
chore: remove main DAL (#3701)
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartwdouglas authored Dec 10, 2024
1 parent 6e7490b commit 551a650
Show file tree
Hide file tree
Showing 16 changed files with 119 additions and 1,162 deletions.
16 changes: 16 additions & 0 deletions backend/controller/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/TBD54566975/ftl/internal/configuration/providers"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/schema"
"github.com/TBD54566975/ftl/internal/schema/schemaeventsource"
)

type AdminService struct {
Expand All @@ -30,6 +31,15 @@ type SchemaRetriever interface {
GetActiveSchema(ctx context.Context) (*schema.Schema, error)
}

type streamSchemaRetriever struct {
source schemaeventsource.EventSource
}

func (c streamSchemaRetriever) GetActiveSchema(ctx context.Context) (*schema.Schema, error) {
view := c.source.View()
return &schema.Schema{Modules: view.Modules}, nil
}

// NewAdminService creates a new AdminService.
// bindAllocator is optional and should be set if a local client is to be used that accesses schema from disk using language plugins.
func NewAdminService(cm *manager.Manager[configuration.Configuration], sm *manager.Manager[configuration.Secrets], schr SchemaRetriever) *AdminService {
Expand Down Expand Up @@ -263,3 +273,9 @@ func (s *AdminService) validateAgainstSchema(ctx context.Context, isSecret bool,

return nil
}

func NewSchemaRetreiver(source schemaeventsource.EventSource) SchemaRetriever {
return &streamSchemaRetriever{
source: source,
}
}
5 changes: 1 addition & 4 deletions backend/controller/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"connectrpc.com/connect"

"github.com/TBD54566975/ftl/backend/controller/admin"
"github.com/TBD54566975/ftl/backend/controller/dal"
pbconsole "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/console/v1"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/console/v1/pbconsoleconnect"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/schema/v1"
Expand All @@ -22,17 +21,15 @@ import (
)

type ConsoleService struct {
dal *dal.DAL
admin *admin.AdminService
schemaEventSource schemaeventsource.EventSource
}

var _ pbconsoleconnect.ConsoleServiceHandler = (*ConsoleService)(nil)
var _ timelinev1connect.TimelineServiceHandler = (*ConsoleService)(nil)

func NewService(dal *dal.DAL, admin *admin.AdminService, schemaEventSource schemaeventsource.EventSource) *ConsoleService {
func NewService(admin *admin.AdminService, schemaEventSource schemaeventsource.EventSource) *ConsoleService {
return &ConsoleService{
dal: dal,
admin: admin,
schemaEventSource: schemaEventSource,
}
Expand Down
119 changes: 87 additions & 32 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,11 @@ import (
"github.com/TBD54566975/ftl/backend/controller/admin"
"github.com/TBD54566975/ftl/backend/controller/artefacts"
"github.com/TBD54566975/ftl/backend/controller/console"
"github.com/TBD54566975/ftl/backend/controller/dal"
dalmodel "github.com/TBD54566975/ftl/backend/controller/dal/model"
"github.com/TBD54566975/ftl/backend/controller/leases"
"github.com/TBD54566975/ftl/backend/controller/observability"
"github.com/TBD54566975/ftl/backend/controller/pubsub"
"github.com/TBD54566975/ftl/backend/controller/scheduledtask"
"github.com/TBD54566975/ftl/backend/controller/state"
"github.com/TBD54566975/ftl/backend/libdal"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/console/v1/pbconsoleconnect"
ftldeployment "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/deployment/v1"
deploymentconnect "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/deployment/v1/ftlv1connect"
Expand All @@ -65,7 +62,6 @@ import (
"github.com/TBD54566975/ftl/internal/schema/schemaeventsource"
"github.com/TBD54566975/ftl/internal/sha256"
"github.com/TBD54566975/ftl/internal/slices"
status "github.com/TBD54566975/ftl/internal/terminal"
)

// CommonConfig between the production controller and development server.
Expand Down Expand Up @@ -149,8 +145,8 @@ func Start(
logger.Debugf("Listening on %s", config.Bind)
logger.Debugf("Advertising as %s", config.Advertise)

admin := admin.NewAdminService(cm, sm, svc.dal)
console := console.NewService(svc.dal, admin, schemaeventsource.New(ctx, rpc.ClientFromContext[ftlv1connect.SchemaServiceClient](ctx)))
admin := admin.NewAdminService(cm, sm, admin.NewSchemaRetreiver(schemaeventsource.New(ctx, rpc.ClientFromContext[ftlv1connect.SchemaServiceClient](ctx))))
console := console.NewService(admin, schemaeventsource.New(ctx, rpc.ClientFromContext[ftlv1connect.SchemaServiceClient](ctx)))

g, ctx := errgroup.WithContext(ctx)

Expand Down Expand Up @@ -182,13 +178,12 @@ type clients struct {
// ControllerListListener is regularly notified of the current list of controllers
// This is often used to update a hash ring to distribute work.
type ControllerListListener interface {
UpdatedControllerList(ctx context.Context, controllers []dalmodel.Controller)
UpdatedControllerList(ctx context.Context, controllers []state.Controller)
}

type Service struct {
conn *sql.DB
leaser leases.Leaser
dal *dal.DAL
key model.ControllerKey
deploymentLogsSink *deploymentLogsSink

Expand Down Expand Up @@ -257,7 +252,6 @@ func New(

pubSub := pubsub.New(ctx, conn, routingTable, svc.controllerState)
svc.pubSub = pubSub
svc.dal = dal.New(svc.storage, svc.controllerState)

svc.deploymentLogsSink = newDeploymentLogsSink(ctx)

Expand Down Expand Up @@ -332,7 +326,7 @@ func (s *Service) ProcessList(ctx context.Context, req *connect.Request[ftlv1.Pr
}

func (s *Service) Status(ctx context.Context, req *connect.Request[ftlv1.StatusRequest]) (*connect.Response[ftlv1.StatusResponse], error) {
controller := dalmodel.Controller{Key: s.key, Endpoint: s.config.Bind.String()}
controller := state.Controller{Key: s.key, Endpoint: s.config.Bind.String()}
currentState := s.controllerState.View()
runners := currentState.Runners()
status := currentState.GetActiveDeployments()
Expand Down Expand Up @@ -476,48 +470,109 @@ func (s *Service) UpdateDeploy(ctx context.Context, req *connect.Request[ftlv1.U
logger := s.getDeploymentLogger(ctx, deploymentKey)
logger.Debugf("Update deployment for: %s", deploymentKey)
if req.Msg.MinReplicas != nil {
err = s.dal.SetDeploymentReplicas(ctx, deploymentKey, int(*req.Msg.MinReplicas))
err = s.setDeploymentReplicas(ctx, deploymentKey, int(*req.Msg.MinReplicas))
if err != nil {
if errors.Is(err, libdal.ErrNotFound) {
logger.Errorf(err, "Deployment not found: %s", deploymentKey)
return nil, connect.NewError(connect.CodeNotFound, errors.New("deployment not found"))
}
logger.Errorf(err, "Could not set deployment replicas: %s", deploymentKey)
return nil, fmt.Errorf("could not set deployment replicas: %w", err)
}
}
return connect.NewResponse(&ftlv1.UpdateDeployResponse{}), nil
}

func (s *Service) setDeploymentReplicas(ctx context.Context, key model.DeploymentKey, minReplicas int) (err error) {

view := s.controllerState.View()
deployment, err := view.GetDeployment(key)
if err != nil {
return fmt.Errorf("could not get deployment: %w", err)
}

err = s.controllerState.Publish(ctx, &state.DeploymentReplicasUpdatedEvent{Key: key, Replicas: minReplicas})
if err != nil {
return fmt.Errorf("could not update deployment replicas: %w", err)
}
if minReplicas == 0 {
err = s.controllerState.Publish(ctx, &state.DeploymentDeactivatedEvent{Key: key, ModuleRemoved: true})
if err != nil {
return fmt.Errorf("could not deactivate deployment: %w", err)
}
} else if deployment.MinReplicas == 0 {
err = s.controllerState.Publish(ctx, &state.DeploymentActivatedEvent{Key: key, ActivatedAt: time.Now(), MinReplicas: minReplicas})
if err != nil {
return fmt.Errorf("could not activate deployment: %w", err)
}
}
timeline.ClientFromContext(ctx).Publish(ctx, timeline.DeploymentUpdated{
DeploymentKey: key,
MinReplicas: minReplicas,
PrevMinReplicas: deployment.MinReplicas,
})

return nil
}

func (s *Service) ReplaceDeploy(ctx context.Context, c *connect.Request[ftlv1.ReplaceDeployRequest]) (*connect.Response[ftlv1.ReplaceDeployResponse], error) {
newDeploymentKey, err := model.ParseDeploymentKey(c.Msg.DeploymentKey)
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, err)
}

logger := s.getDeploymentLogger(ctx, newDeploymentKey)
logger.Debugf("Replace deployment for: %s", newDeploymentKey)

err = s.dal.ReplaceDeployment(ctx, newDeploymentKey, int(c.Msg.MinReplicas))
view := s.controllerState.View()
newDeployment, err := view.GetDeployment(newDeploymentKey)
if err != nil {
if errors.Is(err, libdal.ErrNotFound) {
logger.Errorf(err, "Deployment not found: %s", newDeploymentKey)
return nil, connect.NewError(connect.CodeNotFound, errors.New("deployment not found"))
} else if errors.Is(err, dal.ErrReplaceDeploymentAlreadyActive) {
logger.Infof("Reusing deployment: %s", newDeploymentKey)
cs := s.controllerState.View()
dep, err := cs.GetDeployment(newDeploymentKey)
if err == nil {
status.UpdateModuleState(ctx, dep.Module, status.BuildStateDeployed)
} else {
logger.Errorf(err, "Failed to get deployment from database: %s", newDeploymentKey)
}
} else {
logger.Errorf(err, "Could not replace deployment: %s", newDeploymentKey)
return nil, fmt.Errorf("could not replace deployment: %w", err)
logger.Errorf(err, "Deployment not found: %s", newDeploymentKey)
return nil, connect.NewError(connect.CodeNotFound, errors.New("deployment not found"))
}
minReplicas := int(c.Msg.MinReplicas)
err = s.controllerState.Publish(ctx, &state.DeploymentActivatedEvent{Key: newDeploymentKey, ActivatedAt: time.Now(), MinReplicas: minReplicas})
if err != nil {
return nil, fmt.Errorf("replace deployment failed to activate: %w", err)
}

// If there's an existing deployment, set its desired replicas to 0
var replacedDeploymentKey optional.Option[model.DeploymentKey]
// TODO: remove all this, it needs to be event driven
var oldDeployment *state.Deployment
for _, dep := range view.GetActiveDeployments() {
if dep.Module == newDeployment.Module {
oldDeployment = dep
break
}
}
if oldDeployment != nil {
if oldDeployment.Key.String() == newDeploymentKey.String() {
return nil, fmt.Errorf("replace deployment failed: deployment already exists from %v to %v", oldDeployment.Key, newDeploymentKey)
}
err = s.controllerState.Publish(ctx, &state.DeploymentReplicasUpdatedEvent{Key: newDeploymentKey, Replicas: minReplicas})
if err != nil {
return nil, fmt.Errorf("replace deployment failed to set new deployment replicas from %v to %v: %w", oldDeployment.Key, newDeploymentKey, err)
}
err = s.controllerState.Publish(ctx, &state.DeploymentDeactivatedEvent{Key: oldDeployment.Key})
if err != nil {
return nil, fmt.Errorf("replace deployment failed to deactivate old deployment %v: %w", oldDeployment.Key, err)
}
replacedDeploymentKey = optional.Some(oldDeployment.Key)
} else {
// Set the desired replicas for the new deployment
err = s.controllerState.Publish(ctx, &state.DeploymentReplicasUpdatedEvent{Key: newDeploymentKey, Replicas: minReplicas})
if err != nil {
return nil, fmt.Errorf("replace deployment failed to set replicas for %v: %w", newDeploymentKey, err)
}
}

timeline.ClientFromContext(ctx).Publish(ctx, timeline.DeploymentCreated{
DeploymentKey: newDeploymentKey,
Language: newDeployment.Language,
ModuleName: newDeployment.Module,
MinReplicas: minReplicas,
ReplacedDeployment: replacedDeploymentKey,
})
if err != nil {
return nil, fmt.Errorf("replace deployment failed to create event: %w", err)
}

return connect.NewResponse(&ftlv1.ReplaceDeployResponse{}), nil
}

Expand Down
Loading

0 comments on commit 551a650

Please sign in to comment.