diff --git a/backend/controller/admin/admin.go b/backend/controller/admin/admin.go index d82fac180c..0d507b5491 100644 --- a/backend/controller/admin/admin.go +++ b/backend/controller/admin/admin.go @@ -15,6 +15,7 @@ import ( "github.com/TBD54566975/ftl/internal/configuration/providers" "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/schema" + "github.com/TBD54566975/ftl/internal/schema/schemaeventsource" ) type AdminService struct { @@ -30,6 +31,15 @@ type SchemaRetriever interface { GetActiveSchema(ctx context.Context) (*schema.Schema, error) } +type streamSchemaRetriever struct { + source schemaeventsource.EventSource +} + +func (c streamSchemaRetriever) GetActiveSchema(ctx context.Context) (*schema.Schema, error) { + view := c.source.View() + return &schema.Schema{Modules: view.Modules}, nil +} + // NewAdminService creates a new AdminService. // bindAllocator is optional and should be set if a local client is to be used that accesses schema from disk using language plugins. func NewAdminService(cm *manager.Manager[configuration.Configuration], sm *manager.Manager[configuration.Secrets], schr SchemaRetriever) *AdminService { @@ -263,3 +273,9 @@ func (s *AdminService) validateAgainstSchema(ctx context.Context, isSecret bool, return nil } + +func NewSchemaRetreiver(source schemaeventsource.EventSource) SchemaRetriever { + return &streamSchemaRetriever{ + source: source, + } +} diff --git a/backend/controller/console/console.go b/backend/controller/console/console.go index 060fed54a5..87d2f890e0 100644 --- a/backend/controller/console/console.go +++ b/backend/controller/console/console.go @@ -8,7 +8,6 @@ import ( "connectrpc.com/connect" "github.com/TBD54566975/ftl/backend/controller/admin" - "github.com/TBD54566975/ftl/backend/controller/dal" pbconsole "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/console/v1" "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/console/v1/pbconsoleconnect" schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/schema/v1" @@ -22,7 +21,6 @@ import ( ) type ConsoleService struct { - dal *dal.DAL admin *admin.AdminService schemaEventSource schemaeventsource.EventSource } @@ -30,9 +28,8 @@ type ConsoleService struct { var _ pbconsoleconnect.ConsoleServiceHandler = (*ConsoleService)(nil) var _ timelinev1connect.TimelineServiceHandler = (*ConsoleService)(nil) -func NewService(dal *dal.DAL, admin *admin.AdminService, schemaEventSource schemaeventsource.EventSource) *ConsoleService { +func NewService(admin *admin.AdminService, schemaEventSource schemaeventsource.EventSource) *ConsoleService { return &ConsoleService{ - dal: dal, admin: admin, schemaEventSource: schemaEventSource, } diff --git a/backend/controller/controller.go b/backend/controller/controller.go index 6ab609051e..f5187d6d8b 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -32,14 +32,11 @@ import ( "github.com/TBD54566975/ftl/backend/controller/admin" "github.com/TBD54566975/ftl/backend/controller/artefacts" "github.com/TBD54566975/ftl/backend/controller/console" - "github.com/TBD54566975/ftl/backend/controller/dal" - dalmodel "github.com/TBD54566975/ftl/backend/controller/dal/model" "github.com/TBD54566975/ftl/backend/controller/leases" "github.com/TBD54566975/ftl/backend/controller/observability" "github.com/TBD54566975/ftl/backend/controller/pubsub" "github.com/TBD54566975/ftl/backend/controller/scheduledtask" "github.com/TBD54566975/ftl/backend/controller/state" - "github.com/TBD54566975/ftl/backend/libdal" "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/console/v1/pbconsoleconnect" ftldeployment "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/deployment/v1" deploymentconnect "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/deployment/v1/ftlv1connect" @@ -65,7 +62,6 @@ import ( "github.com/TBD54566975/ftl/internal/schema/schemaeventsource" "github.com/TBD54566975/ftl/internal/sha256" "github.com/TBD54566975/ftl/internal/slices" - status "github.com/TBD54566975/ftl/internal/terminal" ) // CommonConfig between the production controller and development server. @@ -149,8 +145,8 @@ func Start( logger.Debugf("Listening on %s", config.Bind) logger.Debugf("Advertising as %s", config.Advertise) - admin := admin.NewAdminService(cm, sm, svc.dal) - console := console.NewService(svc.dal, admin, schemaeventsource.New(ctx, rpc.ClientFromContext[ftlv1connect.SchemaServiceClient](ctx))) + admin := admin.NewAdminService(cm, sm, admin.NewSchemaRetreiver(schemaeventsource.New(ctx, rpc.ClientFromContext[ftlv1connect.SchemaServiceClient](ctx)))) + console := console.NewService(admin, schemaeventsource.New(ctx, rpc.ClientFromContext[ftlv1connect.SchemaServiceClient](ctx))) g, ctx := errgroup.WithContext(ctx) @@ -182,13 +178,12 @@ type clients struct { // ControllerListListener is regularly notified of the current list of controllers // This is often used to update a hash ring to distribute work. type ControllerListListener interface { - UpdatedControllerList(ctx context.Context, controllers []dalmodel.Controller) + UpdatedControllerList(ctx context.Context, controllers []state.Controller) } type Service struct { conn *sql.DB leaser leases.Leaser - dal *dal.DAL key model.ControllerKey deploymentLogsSink *deploymentLogsSink @@ -257,7 +252,6 @@ func New( pubSub := pubsub.New(ctx, conn, routingTable, svc.controllerState) svc.pubSub = pubSub - svc.dal = dal.New(svc.storage, svc.controllerState) svc.deploymentLogsSink = newDeploymentLogsSink(ctx) @@ -332,7 +326,7 @@ func (s *Service) ProcessList(ctx context.Context, req *connect.Request[ftlv1.Pr } func (s *Service) Status(ctx context.Context, req *connect.Request[ftlv1.StatusRequest]) (*connect.Response[ftlv1.StatusResponse], error) { - controller := dalmodel.Controller{Key: s.key, Endpoint: s.config.Bind.String()} + controller := state.Controller{Key: s.key, Endpoint: s.config.Bind.String()} currentState := s.controllerState.View() runners := currentState.Runners() status := currentState.GetActiveDeployments() @@ -476,12 +470,8 @@ func (s *Service) UpdateDeploy(ctx context.Context, req *connect.Request[ftlv1.U logger := s.getDeploymentLogger(ctx, deploymentKey) logger.Debugf("Update deployment for: %s", deploymentKey) if req.Msg.MinReplicas != nil { - err = s.dal.SetDeploymentReplicas(ctx, deploymentKey, int(*req.Msg.MinReplicas)) + err = s.setDeploymentReplicas(ctx, deploymentKey, int(*req.Msg.MinReplicas)) if err != nil { - if errors.Is(err, libdal.ErrNotFound) { - logger.Errorf(err, "Deployment not found: %s", deploymentKey) - return nil, connect.NewError(connect.CodeNotFound, errors.New("deployment not found")) - } logger.Errorf(err, "Could not set deployment replicas: %s", deploymentKey) return nil, fmt.Errorf("could not set deployment replicas: %w", err) } @@ -489,35 +479,100 @@ func (s *Service) UpdateDeploy(ctx context.Context, req *connect.Request[ftlv1.U return connect.NewResponse(&ftlv1.UpdateDeployResponse{}), nil } +func (s *Service) setDeploymentReplicas(ctx context.Context, key model.DeploymentKey, minReplicas int) (err error) { + + view := s.controllerState.View() + deployment, err := view.GetDeployment(key) + if err != nil { + return fmt.Errorf("could not get deployment: %w", err) + } + + err = s.controllerState.Publish(ctx, &state.DeploymentReplicasUpdatedEvent{Key: key, Replicas: minReplicas}) + if err != nil { + return fmt.Errorf("could not update deployment replicas: %w", err) + } + if minReplicas == 0 { + err = s.controllerState.Publish(ctx, &state.DeploymentDeactivatedEvent{Key: key, ModuleRemoved: true}) + if err != nil { + return fmt.Errorf("could not deactivate deployment: %w", err) + } + } else if deployment.MinReplicas == 0 { + err = s.controllerState.Publish(ctx, &state.DeploymentActivatedEvent{Key: key, ActivatedAt: time.Now(), MinReplicas: minReplicas}) + if err != nil { + return fmt.Errorf("could not activate deployment: %w", err) + } + } + timeline.ClientFromContext(ctx).Publish(ctx, timeline.DeploymentUpdated{ + DeploymentKey: key, + MinReplicas: minReplicas, + PrevMinReplicas: deployment.MinReplicas, + }) + + return nil +} + func (s *Service) ReplaceDeploy(ctx context.Context, c *connect.Request[ftlv1.ReplaceDeployRequest]) (*connect.Response[ftlv1.ReplaceDeployResponse], error) { newDeploymentKey, err := model.ParseDeploymentKey(c.Msg.DeploymentKey) if err != nil { return nil, connect.NewError(connect.CodeInvalidArgument, err) } - logger := s.getDeploymentLogger(ctx, newDeploymentKey) logger.Debugf("Replace deployment for: %s", newDeploymentKey) - err = s.dal.ReplaceDeployment(ctx, newDeploymentKey, int(c.Msg.MinReplicas)) + view := s.controllerState.View() + newDeployment, err := view.GetDeployment(newDeploymentKey) if err != nil { - if errors.Is(err, libdal.ErrNotFound) { - logger.Errorf(err, "Deployment not found: %s", newDeploymentKey) - return nil, connect.NewError(connect.CodeNotFound, errors.New("deployment not found")) - } else if errors.Is(err, dal.ErrReplaceDeploymentAlreadyActive) { - logger.Infof("Reusing deployment: %s", newDeploymentKey) - cs := s.controllerState.View() - dep, err := cs.GetDeployment(newDeploymentKey) - if err == nil { - status.UpdateModuleState(ctx, dep.Module, status.BuildStateDeployed) - } else { - logger.Errorf(err, "Failed to get deployment from database: %s", newDeploymentKey) - } - } else { - logger.Errorf(err, "Could not replace deployment: %s", newDeploymentKey) - return nil, fmt.Errorf("could not replace deployment: %w", err) + logger.Errorf(err, "Deployment not found: %s", newDeploymentKey) + return nil, connect.NewError(connect.CodeNotFound, errors.New("deployment not found")) + } + minReplicas := int(c.Msg.MinReplicas) + err = s.controllerState.Publish(ctx, &state.DeploymentActivatedEvent{Key: newDeploymentKey, ActivatedAt: time.Now(), MinReplicas: minReplicas}) + if err != nil { + return nil, fmt.Errorf("replace deployment failed to activate: %w", err) + } + + // If there's an existing deployment, set its desired replicas to 0 + var replacedDeploymentKey optional.Option[model.DeploymentKey] + // TODO: remove all this, it needs to be event driven + var oldDeployment *state.Deployment + for _, dep := range view.GetActiveDeployments() { + if dep.Module == newDeployment.Module { + oldDeployment = dep + break + } + } + if oldDeployment != nil { + if oldDeployment.Key.String() == newDeploymentKey.String() { + return nil, fmt.Errorf("replace deployment failed: deployment already exists from %v to %v", oldDeployment.Key, newDeploymentKey) + } + err = s.controllerState.Publish(ctx, &state.DeploymentReplicasUpdatedEvent{Key: newDeploymentKey, Replicas: minReplicas}) + if err != nil { + return nil, fmt.Errorf("replace deployment failed to set new deployment replicas from %v to %v: %w", oldDeployment.Key, newDeploymentKey, err) + } + err = s.controllerState.Publish(ctx, &state.DeploymentDeactivatedEvent{Key: oldDeployment.Key}) + if err != nil { + return nil, fmt.Errorf("replace deployment failed to deactivate old deployment %v: %w", oldDeployment.Key, err) + } + replacedDeploymentKey = optional.Some(oldDeployment.Key) + } else { + // Set the desired replicas for the new deployment + err = s.controllerState.Publish(ctx, &state.DeploymentReplicasUpdatedEvent{Key: newDeploymentKey, Replicas: minReplicas}) + if err != nil { + return nil, fmt.Errorf("replace deployment failed to set replicas for %v: %w", newDeploymentKey, err) } } + timeline.ClientFromContext(ctx).Publish(ctx, timeline.DeploymentCreated{ + DeploymentKey: newDeploymentKey, + Language: newDeployment.Language, + ModuleName: newDeployment.Module, + MinReplicas: minReplicas, + ReplacedDeployment: replacedDeploymentKey, + }) + if err != nil { + return nil, fmt.Errorf("replace deployment failed to create event: %w", err) + } + return connect.NewResponse(&ftlv1.ReplaceDeployResponse{}), nil } diff --git a/backend/controller/dal/dal.go b/backend/controller/dal/dal.go deleted file mode 100644 index 7d9aedb6ad..0000000000 --- a/backend/controller/dal/dal.go +++ /dev/null @@ -1,170 +0,0 @@ -// Package dal provides a data abstraction layer for the Controller -package dal - -import ( - "context" - "errors" - "fmt" - "time" - - "github.com/alecthomas/types/optional" - xmaps "golang.org/x/exp/maps" - - aregistry "github.com/TBD54566975/ftl/backend/controller/artefacts" - "github.com/TBD54566975/ftl/backend/controller/pubsub" - "github.com/TBD54566975/ftl/backend/controller/state" - "github.com/TBD54566975/ftl/backend/libdal" - "github.com/TBD54566975/ftl/backend/timeline" - "github.com/TBD54566975/ftl/internal/model" - "github.com/TBD54566975/ftl/internal/schema" -) - -func New(registry aregistry.Service, state state.ControllerState) *DAL { - var d *DAL - d = &DAL{ - registry: registry, - state: state, - } - - return d -} - -type DAL struct { - pubsub *pubsub.Service - registry aregistry.Service - state state.ControllerState -} - -// SetDeploymentReplicas activates the given deployment. -func (d *DAL) SetDeploymentReplicas(ctx context.Context, key model.DeploymentKey, minReplicas int) (err error) { - - view := d.state.View() - deployment, err := view.GetDeployment(key) - if err != nil { - return fmt.Errorf("could not get deployment: %w", err) - } - - err = d.state.Publish(ctx, &state.DeploymentReplicasUpdatedEvent{Key: key, Replicas: minReplicas}) - if err != nil { - return libdal.TranslatePGError(err) - } - if minReplicas == 0 { - err = d.state.Publish(ctx, &state.DeploymentDeactivatedEvent{Key: key, ModuleRemoved: true}) - if err != nil { - return libdal.TranslatePGError(err) - } - } else if deployment.MinReplicas == 0 { - err = d.state.Publish(ctx, &state.DeploymentActivatedEvent{Key: key, ActivatedAt: time.Now(), MinReplicas: minReplicas}) - if err != nil { - return libdal.TranslatePGError(err) - } - } - timeline.ClientFromContext(ctx).Publish(ctx, timeline.DeploymentUpdated{ - DeploymentKey: key, - MinReplicas: minReplicas, - PrevMinReplicas: int(deployment.MinReplicas), - }) - - return nil -} - -var ErrReplaceDeploymentAlreadyActive = errors.New("deployment already active") - -// ReplaceDeployment replaces an old deployment of a module with a new deployment. -// -// returns ErrReplaceDeploymentAlreadyActive if the new deployment is already active. -func (d *DAL) ReplaceDeployment(ctx context.Context, newDeploymentKey model.DeploymentKey, minReplicas int) (err error) { - view := d.state.View() - newDeployment, err := view.GetDeployment(newDeploymentKey) - if err != nil { - return fmt.Errorf("replace deployment failed to get deployment for %v: %w", newDeploymentKey, libdal.TranslatePGError(err)) - } - - err = d.state.Publish(ctx, &state.DeploymentActivatedEvent{Key: newDeploymentKey, ActivatedAt: time.Now(), MinReplicas: minReplicas}) - if err != nil { - return libdal.TranslatePGError(err) - } - - // If there's an existing deployment, set its desired replicas to 0 - var replacedDeploymentKey optional.Option[model.DeploymentKey] - // TODO: remove all this, it needs to be event driven - var oldDeployment *state.Deployment - for _, dep := range view.GetActiveDeployments() { - if dep.Module == newDeployment.Module { - oldDeployment = dep - break - } - } - if oldDeployment != nil { - if oldDeployment.Key.String() == newDeploymentKey.String() { - return fmt.Errorf("replace deployment failed: deployment already exists from %v to %v: %w", oldDeployment.Key, newDeploymentKey, ErrReplaceDeploymentAlreadyActive) - } - err = d.state.Publish(ctx, &state.DeploymentReplicasUpdatedEvent{Key: newDeploymentKey, Replicas: minReplicas}) - if err != nil { - return fmt.Errorf("replace deployment failed to set new deployment replicas from %v to %v: %w", oldDeployment.Key, newDeploymentKey, libdal.TranslatePGError(err)) - } - err = d.state.Publish(ctx, &state.DeploymentDeactivatedEvent{Key: oldDeployment.Key}) - if err != nil { - return libdal.TranslatePGError(err) - } - replacedDeploymentKey = optional.Some(oldDeployment.Key) - } else { - // Set the desired replicas for the new deployment - err = d.state.Publish(ctx, &state.DeploymentReplicasUpdatedEvent{Key: newDeploymentKey, Replicas: minReplicas}) - if err != nil { - return fmt.Errorf("replace deployment failed to set replicas for %v: %w", newDeploymentKey, libdal.TranslatePGError(err)) - } - } - - timeline.ClientFromContext(ctx).Publish(ctx, timeline.DeploymentCreated{ - DeploymentKey: newDeploymentKey, - Language: newDeployment.Language, - ModuleName: newDeployment.Module, - MinReplicas: minReplicas, - ReplacedDeployment: replacedDeploymentKey, - }) - if err != nil { - return fmt.Errorf("replace deployment failed to create event: %w", libdal.TranslatePGError(err)) - } - return nil -} - -// GetActiveSchema returns the schema for all active deployments. -func (d *DAL) GetActiveSchema(ctx context.Context) (*schema.Schema, error) { - view := d.state.View() - deployments := view.GetActiveDeployments() - - schemaMap := map[string]*schema.Module{} - timeMap := map[string]time.Time{} - for _, dep := range deployments { - if _, ok := schemaMap[dep.Module]; ok { - if timeMap[dep.Module].Before(dep.CreatedAt) { - continue - } - } - // We only take the older ones - // If new ones exist they are not live yet - // Or the old ones would be gone - schemaMap[dep.Module] = dep.Schema - timeMap[dep.Module] = dep.CreatedAt - } - fullSchema := &schema.Schema{Modules: xmaps.Values(schemaMap)} - sch, err := schema.ValidateSchema(fullSchema) - if err != nil { - return nil, fmt.Errorf("could not validate schema: %w", err) - } - return sch, nil -} - -type ProcessRunner struct { - Key model.RunnerKey - Endpoint string - Labels model.Labels -} - -type Process struct { - Deployment model.DeploymentKey - MinReplicas int - Labels model.Labels - Runner optional.Option[ProcessRunner] -} diff --git a/backend/controller/dal/dal_test.go b/backend/controller/dal/dal_test.go deleted file mode 100644 index b8ef228757..0000000000 --- a/backend/controller/dal/dal_test.go +++ /dev/null @@ -1,84 +0,0 @@ -package dal - -import ( - "context" - "net/url" - "sync" - "testing" - "time" - - "github.com/alecthomas/assert/v2" - - "github.com/TBD54566975/ftl/backend/controller/artefacts" - "github.com/TBD54566975/ftl/backend/controller/state" - "github.com/TBD54566975/ftl/backend/timeline" - "github.com/TBD54566975/ftl/internal/log" - "github.com/TBD54566975/ftl/internal/model" - "github.com/TBD54566975/ftl/internal/schema" - "github.com/TBD54566975/ftl/internal/sha256" -) - -func TestDAL(t *testing.T) { - ctx := log.ContextWithNewDefaultLogger(context.Background()) - timelineEndpoint, err := url.Parse("http://localhost:8080") - assert.NoError(t, err) - ctx = timeline.ContextWithClient(ctx, timeline.NewClient(ctx, timelineEndpoint)) - - dal := New(artefacts.NewForTesting(), state.NewInMemoryState()) - - module := &schema.Module{Name: "test"} - var deploymentKey model.DeploymentKey - t.Run("CreateDeployment", func(t *testing.T) { - deploymentKey = model.NewDeploymentKey(module.Name) - err = dal.state.Publish(ctx, &state.DeploymentCreatedEvent{ - Key: deploymentKey, - CreatedAt: time.Now(), - Module: module.Name, - Schema: &schema.Module{Name: module.Name}, - }) - assert.NoError(t, err) - }) - - t.Run("SetDeploymentReplicas", func(t *testing.T) { - err := dal.SetDeploymentReplicas(ctx, deploymentKey, 1) - assert.NoError(t, err) - }) -} - -func TestCreateArtefactConflict(t *testing.T) { - ctx := log.ContextWithNewDefaultLogger(context.Background()) - - dal := New(artefacts.NewForTesting(), state.NewInMemoryState()) - - idch := make(chan sha256.SHA256, 2) - - wg := sync.WaitGroup{} - - wg.Add(2) - createContent := func() { - defer wg.Done() - digest, err := dal.registry.Upload(ctx, artefacts.Artefact{Content: []byte("content")}) - assert.NoError(t, err) - time.Sleep(time.Second * 2) - assert.NoError(t, err) - idch <- digest - } - - go createContent() - go createContent() - - wg.Wait() - - ids := []sha256.SHA256{} - - for range 2 { - select { - case id := <-idch: - ids = append(ids, id) - case <-time.After(time.Second * 3): - t.Fatal("Timed out waiting for artefact creation") - } - } - assert.Equal(t, 2, len(ids)) - assert.Equal(t, ids[0], ids[1]) -} diff --git a/backend/controller/dal/internal/sql/async_queries.sql.go b/backend/controller/dal/internal/sql/async_queries.sql.go deleted file mode 100644 index 9388518d31..0000000000 --- a/backend/controller/dal/internal/sql/async_queries.sql.go +++ /dev/null @@ -1,88 +0,0 @@ -// Code generated by sqlc. DO NOT EDIT. -// versions: -// sqlc v1.27.0 -// source: async_queries.sql - -package sql - -import ( - "context" - "encoding/json" - "time" - - "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes" - "github.com/TBD54566975/ftl/internal/schema" - "github.com/alecthomas/types/optional" -) - -const asyncCallQueueDepth = `-- name: AsyncCallQueueDepth :one -SELECT count(*) -FROM async_calls -WHERE state = 'pending' AND scheduled_at <= (NOW() AT TIME ZONE 'utc') -` - -func (q *Queries) AsyncCallQueueDepth(ctx context.Context) (int64, error) { - row := q.db.QueryRowContext(ctx, asyncCallQueueDepth) - var count int64 - err := row.Scan(&count) - return count, err -} - -const createAsyncCall = `-- name: CreateAsyncCall :one -INSERT INTO async_calls ( - scheduled_at, - verb, - origin, - request, - remaining_attempts, - backoff, - max_backoff, - catch_verb, - parent_request_key, - trace_context -) -VALUES ( - $1::TIMESTAMPTZ, - $2, - $3, - $4, - $5, - $6::interval, - $7::interval, - $8, - $9, - $10::jsonb -) -RETURNING id -` - -type CreateAsyncCallParams struct { - ScheduledAt time.Time - Verb schema.RefKey - Origin string - Request json.RawMessage - RemainingAttempts int32 - Backoff sqltypes.Duration - MaxBackoff sqltypes.Duration - CatchVerb optional.Option[schema.RefKey] - ParentRequestKey optional.Option[string] - TraceContext json.RawMessage -} - -func (q *Queries) CreateAsyncCall(ctx context.Context, arg CreateAsyncCallParams) (int64, error) { - row := q.db.QueryRowContext(ctx, createAsyncCall, - arg.ScheduledAt, - arg.Verb, - arg.Origin, - arg.Request, - arg.RemainingAttempts, - arg.Backoff, - arg.MaxBackoff, - arg.CatchVerb, - arg.ParentRequestKey, - arg.TraceContext, - ) - var id int64 - err := row.Scan(&id) - return id, err -} diff --git a/backend/controller/dal/internal/sql/db.go b/backend/controller/dal/internal/sql/db.go deleted file mode 100644 index 0e0973111c..0000000000 --- a/backend/controller/dal/internal/sql/db.go +++ /dev/null @@ -1,31 +0,0 @@ -// Code generated by sqlc. DO NOT EDIT. -// versions: -// sqlc v1.27.0 - -package sql - -import ( - "context" - "database/sql" -) - -type DBTX interface { - ExecContext(context.Context, string, ...interface{}) (sql.Result, error) - PrepareContext(context.Context, string) (*sql.Stmt, error) - QueryContext(context.Context, string, ...interface{}) (*sql.Rows, error) - QueryRowContext(context.Context, string, ...interface{}) *sql.Row -} - -func New(db DBTX) *Queries { - return &Queries{db: db} -} - -type Queries struct { - db DBTX -} - -func (q *Queries) WithTx(tx *sql.Tx) *Queries { - return &Queries{ - db: tx, - } -} diff --git a/backend/controller/dal/internal/sql/models.go b/backend/controller/dal/internal/sql/models.go deleted file mode 100644 index bcbe0b6e25..0000000000 --- a/backend/controller/dal/internal/sql/models.go +++ /dev/null @@ -1,91 +0,0 @@ -// Code generated by sqlc. DO NOT EDIT. -// versions: -// sqlc v1.27.0 - -package sql - -import ( - "database/sql/driver" - "encoding/json" - "fmt" - "time" - - "github.com/TBD54566975/ftl/internal/model" - "github.com/alecthomas/types/optional" - "github.com/sqlc-dev/pqtype" -) - -type TopicSubscriptionState string - -const ( - TopicSubscriptionStateIdle TopicSubscriptionState = "idle" - TopicSubscriptionStateExecuting TopicSubscriptionState = "executing" -) - -func (e *TopicSubscriptionState) Scan(src interface{}) error { - switch s := src.(type) { - case []byte: - *e = TopicSubscriptionState(s) - case string: - *e = TopicSubscriptionState(s) - default: - return fmt.Errorf("unsupported scan type for TopicSubscriptionState: %T", src) - } - return nil -} - -type NullTopicSubscriptionState struct { - TopicSubscriptionState TopicSubscriptionState - Valid bool // Valid is true if TopicSubscriptionState is not NULL -} - -// Scan implements the Scanner interface. -func (ns *NullTopicSubscriptionState) Scan(value interface{}) error { - if value == nil { - ns.TopicSubscriptionState, ns.Valid = "", false - return nil - } - ns.Valid = true - return ns.TopicSubscriptionState.Scan(value) -} - -// Value implements the driver Valuer interface. -func (ns NullTopicSubscriptionState) Value() (driver.Value, error) { - if !ns.Valid { - return nil, nil - } - return string(ns.TopicSubscriptionState), nil -} - -type Topic struct { - ID int64 - Key model.TopicKey - CreatedAt time.Time - Name string - Type string - Head optional.Option[int64] - ModuleName string -} - -type TopicEvent struct { - ID int64 - CreatedAt time.Time - Key model.TopicEventKey - TopicID int64 - Caller optional.Option[string] - RequestKey optional.Option[string] - TraceContext pqtype.NullRawMessage - Payload json.RawMessage -} - -type TopicSubscription struct { - ID int64 - Key model.SubscriptionKey - CreatedAt time.Time - TopicID int64 - Name string - Cursor optional.Option[int64] - State TopicSubscriptionState - DeploymentKey model.DeploymentKey - ModuleName string -} diff --git a/backend/controller/dal/internal/sql/querier.go b/backend/controller/dal/internal/sql/querier.go deleted file mode 100644 index 2ccff303d4..0000000000 --- a/backend/controller/dal/internal/sql/querier.go +++ /dev/null @@ -1,37 +0,0 @@ -// Code generated by sqlc. DO NOT EDIT. -// versions: -// sqlc v1.27.0 - -package sql - -import ( - "context" - - "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes" - "github.com/TBD54566975/ftl/internal/model" - "github.com/alecthomas/types/optional" -) - -type Querier interface { - BeginConsumingTopicEvent(ctx context.Context, subscription model.SubscriptionKey, event model.TopicEventKey) error - CompleteEventForSubscription(ctx context.Context, name string, module string) error - DeleteSubscribers(ctx context.Context, deployment model.DeploymentKey) ([]model.SubscriberKey, error) - DeleteSubscriptions(ctx context.Context, deployment model.DeploymentKey) ([]model.SubscriptionKey, error) - GetNextEventForSubscription(ctx context.Context, consumptionDelay sqltypes.Duration, topic model.TopicKey, cursor optional.Option[model.TopicEventKey]) (GetNextEventForSubscriptionRow, error) - GetRandomSubscriber(ctx context.Context, key model.SubscriptionKey) (GetRandomSubscriberRow, error) - GetSubscription(ctx context.Context, column1 string, column2 string) (TopicSubscription, error) - // Results may not be ready to be scheduled yet due to event consumption delay - // Sorting ensures that brand new events (that may not be ready for consumption) - // don't prevent older events from being consumed - // We also make sure that the subscription belongs to a deployment that has at least one runner - GetSubscriptionsNeedingUpdate(ctx context.Context) ([]GetSubscriptionsNeedingUpdateRow, error) - GetTopic(ctx context.Context, dollar_1 int64) (Topic, error) - GetTopicEvent(ctx context.Context, dollar_1 int64) (TopicEvent, error) - InsertSubscriber(ctx context.Context, arg InsertSubscriberParams) error - PublishEventForTopic(ctx context.Context, arg PublishEventForTopicParams) error - SetSubscriptionCursor(ctx context.Context, column1 model.SubscriptionKey, column2 model.TopicEventKey) error - UpsertSubscription(ctx context.Context, arg UpsertSubscriptionParams) (UpsertSubscriptionRow, error) - UpsertTopic(ctx context.Context, arg UpsertTopicParams) error -} - -var _ Querier = (*Queries)(nil) diff --git a/backend/controller/dal/internal/sql/queries.sql b/backend/controller/dal/internal/sql/queries.sql deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/backend/controller/dal/internal/sql/queries.sql.go b/backend/controller/dal/internal/sql/queries.sql.go deleted file mode 100644 index 0c3f092ffe..0000000000 --- a/backend/controller/dal/internal/sql/queries.sql.go +++ /dev/null @@ -1,526 +0,0 @@ -// Code generated by sqlc. DO NOT EDIT. -// versions: -// sqlc v1.27.0 -// source: queries.sql - -package sql - -import ( - "context" - "encoding/json" - - "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes" - "github.com/TBD54566975/ftl/internal/model" - "github.com/TBD54566975/ftl/internal/schema" - "github.com/alecthomas/types/optional" - "github.com/sqlc-dev/pqtype" -) - -const beginConsumingTopicEvent = `-- name: BeginConsumingTopicEvent :exec -WITH event AS ( - SELECT id, created_at, key, topic_id, caller, request_key, trace_context, payload - FROM topic_events - WHERE "key" = $2::topic_event_key -) -UPDATE topic_subscriptions -SET state = 'executing', - cursor = (SELECT id FROM event) -WHERE key = $1::subscription_key -` - -func (q *Queries) BeginConsumingTopicEvent(ctx context.Context, subscription model.SubscriptionKey, event model.TopicEventKey) error { - _, err := q.db.ExecContext(ctx, beginConsumingTopicEvent, subscription, event) - return err -} - -const completeEventForSubscription = `-- name: CompleteEventForSubscription :exec -UPDATE topic_subscriptions -SET state = 'idle' -WHERE name = $1::TEXT - AND module_name = $2::TEXT -` - -func (q *Queries) CompleteEventForSubscription(ctx context.Context, name string, module string) error { - _, err := q.db.ExecContext(ctx, completeEventForSubscription, name, module) - return err -} - -const deleteSubscribers = `-- name: DeleteSubscribers :many -DELETE FROM topic_subscribers -WHERE deployment_key = $1::deployment_key -RETURNING topic_subscribers.key -` - -func (q *Queries) DeleteSubscribers(ctx context.Context, deployment model.DeploymentKey) ([]model.SubscriberKey, error) { - rows, err := q.db.QueryContext(ctx, deleteSubscribers, deployment) - if err != nil { - return nil, err - } - defer rows.Close() - var items []model.SubscriberKey - for rows.Next() { - var key model.SubscriberKey - if err := rows.Scan(&key); err != nil { - return nil, err - } - items = append(items, key) - } - if err := rows.Close(); err != nil { - return nil, err - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil -} - -const deleteSubscriptions = `-- name: DeleteSubscriptions :many -DELETE FROM topic_subscriptions -WHERE deployment_key = $1::deployment_key -RETURNING topic_subscriptions.key -` - -func (q *Queries) DeleteSubscriptions(ctx context.Context, deployment model.DeploymentKey) ([]model.SubscriptionKey, error) { - rows, err := q.db.QueryContext(ctx, deleteSubscriptions, deployment) - if err != nil { - return nil, err - } - defer rows.Close() - var items []model.SubscriptionKey - for rows.Next() { - var key model.SubscriptionKey - if err := rows.Scan(&key); err != nil { - return nil, err - } - items = append(items, key) - } - if err := rows.Close(); err != nil { - return nil, err - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil -} - -const getNextEventForSubscription = `-- name: GetNextEventForSubscription :one -WITH cursor AS ( - SELECT - created_at, - id - FROM topic_events - WHERE "key" = $3::topic_event_key -) -SELECT events."key" as event, - events.payload, - events.created_at, - events.caller, - events.request_key, - events.trace_context, - NOW() - events.created_at >= $1::interval AS ready -FROM topics - LEFT JOIN topic_events as events ON events.topic_id = topics.id -WHERE topics.key = $2::topic_key - AND (events.created_at, events.id) > (SELECT COALESCE(MAX(cursor.created_at), '1900-01-01'), COALESCE(MAX(cursor.id), 0) FROM cursor) -ORDER BY events.created_at, events.id -LIMIT 1 -` - -type GetNextEventForSubscriptionRow struct { - Event optional.Option[model.TopicEventKey] - Payload pqtype.NullRawMessage - CreatedAt sqltypes.OptionalTime - Caller optional.Option[string] - RequestKey optional.Option[string] - TraceContext pqtype.NullRawMessage - Ready bool -} - -func (q *Queries) GetNextEventForSubscription(ctx context.Context, consumptionDelay sqltypes.Duration, topic model.TopicKey, cursor optional.Option[model.TopicEventKey]) (GetNextEventForSubscriptionRow, error) { - row := q.db.QueryRowContext(ctx, getNextEventForSubscription, consumptionDelay, topic, cursor) - var i GetNextEventForSubscriptionRow - err := row.Scan( - &i.Event, - &i.Payload, - &i.CreatedAt, - &i.Caller, - &i.RequestKey, - &i.TraceContext, - &i.Ready, - ) - return i, err -} - -const getRandomSubscriber = `-- name: GetRandomSubscriber :one -SELECT - subscribers.sink as sink, - subscribers.retry_attempts as retry_attempts, - subscribers.backoff as backoff, - subscribers.max_backoff as max_backoff, - subscribers.catch_verb as catch_verb, - subscribers.deployment_key as deployment_key -FROM topic_subscribers as subscribers - JOIN topic_subscriptions ON subscribers.topic_subscriptions_id = topic_subscriptions.id -WHERE topic_subscriptions.key = $1::subscription_key -ORDER BY RANDOM() -LIMIT 1 -` - -type GetRandomSubscriberRow struct { - Sink schema.RefKey - RetryAttempts int32 - Backoff sqltypes.Duration - MaxBackoff sqltypes.Duration - CatchVerb optional.Option[schema.RefKey] - DeploymentKey model.DeploymentKey -} - -func (q *Queries) GetRandomSubscriber(ctx context.Context, key model.SubscriptionKey) (GetRandomSubscriberRow, error) { - row := q.db.QueryRowContext(ctx, getRandomSubscriber, key) - var i GetRandomSubscriberRow - err := row.Scan( - &i.Sink, - &i.RetryAttempts, - &i.Backoff, - &i.MaxBackoff, - &i.CatchVerb, - &i.DeploymentKey, - ) - return i, err -} - -const getSubscription = `-- name: GetSubscription :one -SELECT id, key, created_at, topic_id, name, cursor, state, deployment_key, module_name -FROM topic_subscriptions -WHERE name = $1::TEXT - AND module_name = $2::TEXT -` - -func (q *Queries) GetSubscription(ctx context.Context, column1 string, column2 string) (TopicSubscription, error) { - row := q.db.QueryRowContext(ctx, getSubscription, column1, column2) - var i TopicSubscription - err := row.Scan( - &i.ID, - &i.Key, - &i.CreatedAt, - &i.TopicID, - &i.Name, - &i.Cursor, - &i.State, - &i.DeploymentKey, - &i.ModuleName, - ) - return i, err -} - -const getSubscriptionsNeedingUpdate = `-- name: GetSubscriptionsNeedingUpdate :many -SELECT - subs.key::subscription_key as key, - curser.key as cursor, - topics.key::topic_key as topic, - subs.name, - deployment_key as deployment_key, - curser.request_key as request_key -FROM topic_subscriptions subs - LEFT JOIN topics ON subs.topic_id = topics.id - LEFT JOIN topic_events curser ON subs.cursor = curser.id -WHERE subs.cursor IS DISTINCT FROM topics.head - AND subs.state = 'idle' -ORDER BY curser.created_at -LIMIT 3 - FOR UPDATE OF subs SKIP LOCKED -` - -type GetSubscriptionsNeedingUpdateRow struct { - Key model.SubscriptionKey - Cursor optional.Option[model.TopicEventKey] - Topic model.TopicKey - Name string - DeploymentKey model.DeploymentKey - RequestKey optional.Option[string] -} - -// Results may not be ready to be scheduled yet due to event consumption delay -// Sorting ensures that brand new events (that may not be ready for consumption) -// don't prevent older events from being consumed -// We also make sure that the subscription belongs to a deployment that has at least one runner -func (q *Queries) GetSubscriptionsNeedingUpdate(ctx context.Context) ([]GetSubscriptionsNeedingUpdateRow, error) { - rows, err := q.db.QueryContext(ctx, getSubscriptionsNeedingUpdate) - if err != nil { - return nil, err - } - defer rows.Close() - var items []GetSubscriptionsNeedingUpdateRow - for rows.Next() { - var i GetSubscriptionsNeedingUpdateRow - if err := rows.Scan( - &i.Key, - &i.Cursor, - &i.Topic, - &i.Name, - &i.DeploymentKey, - &i.RequestKey, - ); err != nil { - return nil, err - } - items = append(items, i) - } - if err := rows.Close(); err != nil { - return nil, err - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil -} - -const getTopic = `-- name: GetTopic :one -SELECT id, key, created_at, name, type, head, module_name -FROM topics -WHERE id = $1::BIGINT -` - -func (q *Queries) GetTopic(ctx context.Context, dollar_1 int64) (Topic, error) { - row := q.db.QueryRowContext(ctx, getTopic, dollar_1) - var i Topic - err := row.Scan( - &i.ID, - &i.Key, - &i.CreatedAt, - &i.Name, - &i.Type, - &i.Head, - &i.ModuleName, - ) - return i, err -} - -const getTopicEvent = `-- name: GetTopicEvent :one -SELECT id, created_at, key, topic_id, caller, request_key, trace_context, payload -FROM topic_events -WHERE id = $1::BIGINT -` - -func (q *Queries) GetTopicEvent(ctx context.Context, dollar_1 int64) (TopicEvent, error) { - row := q.db.QueryRowContext(ctx, getTopicEvent, dollar_1) - var i TopicEvent - err := row.Scan( - &i.ID, - &i.CreatedAt, - &i.Key, - &i.TopicID, - &i.Caller, - &i.RequestKey, - &i.TraceContext, - &i.Payload, - ) - return i, err -} - -const insertSubscriber = `-- name: InsertSubscriber :exec -INSERT INTO topic_subscribers ( - key, - topic_subscriptions_id, - deployment_key, - sink, - retry_attempts, - backoff, - max_backoff, - catch_verb -) -VALUES ( - $1::subscriber_key, - ( - SELECT topic_subscriptions.id as id - FROM topic_subscriptions - WHERE module_name = $2::TEXT - AND topic_subscriptions.name = $3::TEXT - ), - $4::deployment_key, - $5, - $6, - $7::interval, - $8::interval, - $9 - ) -` - -type InsertSubscriberParams struct { - Key model.SubscriberKey - Module string - SubscriptionName string - Deployment model.DeploymentKey - Sink schema.RefKey - RetryAttempts int32 - Backoff sqltypes.Duration - MaxBackoff sqltypes.Duration - CatchVerb optional.Option[schema.RefKey] -} - -func (q *Queries) InsertSubscriber(ctx context.Context, arg InsertSubscriberParams) error { - _, err := q.db.ExecContext(ctx, insertSubscriber, - arg.Key, - arg.Module, - arg.SubscriptionName, - arg.Deployment, - arg.Sink, - arg.RetryAttempts, - arg.Backoff, - arg.MaxBackoff, - arg.CatchVerb, - ) - return err -} - -const publishEventForTopic = `-- name: PublishEventForTopic :exec -INSERT INTO topic_events ( - "key", - topic_id, - caller, - payload, - request_key, - trace_context -) -VALUES ( - $1::topic_event_key, - ( - SELECT topics.id - FROM topics - WHERE module_name = $2::TEXT - AND topics.name = $3::TEXT - ), - $4::TEXT, - $5, - $6::TEXT, - $7::jsonb - ) -` - -type PublishEventForTopicParams struct { - Key model.TopicEventKey - Module string - Topic string - Caller string - Payload json.RawMessage - RequestKey string - TraceContext json.RawMessage -} - -func (q *Queries) PublishEventForTopic(ctx context.Context, arg PublishEventForTopicParams) error { - _, err := q.db.ExecContext(ctx, publishEventForTopic, - arg.Key, - arg.Module, - arg.Topic, - arg.Caller, - arg.Payload, - arg.RequestKey, - arg.TraceContext, - ) - return err -} - -const setSubscriptionCursor = `-- name: SetSubscriptionCursor :exec -WITH event AS ( - SELECT id, created_at, key, topic_id, payload - FROM topic_events - WHERE "key" = $2::topic_event_key -) -UPDATE topic_subscriptions -SET cursor = (SELECT id FROM event) -WHERE key = $1::subscription_key -` - -func (q *Queries) SetSubscriptionCursor(ctx context.Context, column1 model.SubscriptionKey, column2 model.TopicEventKey) error { - _, err := q.db.ExecContext(ctx, setSubscriptionCursor, column1, column2) - return err -} - -const upsertSubscription = `-- name: UpsertSubscription :one -INSERT INTO topic_subscriptions ( - key, - topic_id, - module_name, - deployment_key, - name) -VALUES ( - $1::subscription_key, - ( - SELECT topics.id as id - FROM topics - WHERE module_name = $2::TEXT - AND topics.name = $3::TEXT - ), - $4::TEXT, - $5::deployment_key, - $6::TEXT - ) -ON CONFLICT (name, module_name) DO - UPDATE SET - topic_id = excluded.topic_id, - deployment_key = $5::deployment_key -RETURNING - id, - CASE - WHEN xmax = 0 THEN true - ELSE false - END AS inserted -` - -type UpsertSubscriptionParams struct { - Key model.SubscriptionKey - TopicModule string - TopicName string - Module string - Deployment model.DeploymentKey - Name string -} - -type UpsertSubscriptionRow struct { - ID int64 - Inserted bool -} - -func (q *Queries) UpsertSubscription(ctx context.Context, arg UpsertSubscriptionParams) (UpsertSubscriptionRow, error) { - row := q.db.QueryRowContext(ctx, upsertSubscription, - arg.Key, - arg.TopicModule, - arg.TopicName, - arg.Module, - arg.Deployment, - arg.Name, - ) - var i UpsertSubscriptionRow - err := row.Scan(&i.ID, &i.Inserted) - return i, err -} - -const upsertTopic = `-- name: UpsertTopic :exec -INSERT INTO topics (key, module_name, name, type) -VALUES ( - $1::topic_key, - $2::TEXT, - $3::TEXT, - $4::TEXT - ) -ON CONFLICT (name, module_name) DO - UPDATE SET - type = $4::TEXT -RETURNING id -` - -type UpsertTopicParams struct { - Topic model.TopicKey - Module string - Name string - EventType string -} - -func (q *Queries) UpsertTopic(ctx context.Context, arg UpsertTopicParams) error { - _, err := q.db.ExecContext(ctx, upsertTopic, - arg.Topic, - arg.Module, - arg.Name, - arg.EventType, - ) - return err -} diff --git a/backend/controller/dal/model/model.go b/backend/controller/dal/model/model.go deleted file mode 100644 index 0982e5eba9..0000000000 --- a/backend/controller/dal/model/model.go +++ /dev/null @@ -1,89 +0,0 @@ -package model - -import ( - "fmt" - "time" - - "github.com/alecthomas/types/optional" - - ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" - "github.com/TBD54566975/ftl/internal/model" - "github.com/TBD54566975/ftl/internal/schema" - "github.com/TBD54566975/ftl/internal/sha256" -) - -type Runner struct { - Key model.RunnerKey - Endpoint string - ReservationTimeout optional.Option[time.Duration] - Module optional.Option[string] - Deployment model.DeploymentKey - Labels model.Labels -} - -func (Runner) notification() {} - -type Reconciliation struct { - Deployment model.DeploymentKey - Module string - Language string - - AssignedReplicas int - RequiredReplicas int -} - -type ControllerState string - -type RequestOrigin string - -type Deployment struct { - Key model.DeploymentKey - Language string - Module string - MinReplicas int - Schema *schema.Module - CreatedAt time.Time - Labels model.Labels -} - -func (d Deployment) String() string { return d.Key.String() } - -func (d Deployment) notification() {} - -type Controller struct { - Key model.ControllerKey - Endpoint string - State ControllerState -} - -type Status struct { - Controllers []Controller - Runners []Runner - Deployments []Deployment -} - -type DeploymentArtefact struct { - Digest sha256.SHA256 - Executable bool - Path string -} - -func (d *DeploymentArtefact) ToProto() *ftlv1.DeploymentArtefact { - return &ftlv1.DeploymentArtefact{ - Digest: d.Digest.String(), - Executable: d.Executable, - Path: d.Path, - } -} - -func DeploymentArtefactFromProto(in *ftlv1.DeploymentArtefact) (DeploymentArtefact, error) { - digest, err := sha256.ParseSHA256(in.Digest) - if err != nil { - return DeploymentArtefact{}, fmt.Errorf("invalid digest: %w", err) - } - return DeploymentArtefact{ - Digest: digest, - Executable: in.Executable, - Path: in.Path, - }, nil -} diff --git a/backend/controller/scheduledtask/scheduledtask.go b/backend/controller/scheduledtask/scheduledtask.go index 2e261c82db..79f432d6da 100644 --- a/backend/controller/scheduledtask/scheduledtask.go +++ b/backend/controller/scheduledtask/scheduledtask.go @@ -11,7 +11,6 @@ import ( "github.com/benbjohnson/clock" "github.com/jpillora/backoff" - dalmodel "github.com/TBD54566975/ftl/backend/controller/dal/model" "github.com/TBD54566975/ftl/backend/controller/leases" "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/model" @@ -33,8 +32,6 @@ type descriptor struct { // run. type Job func(ctx context.Context) (time.Duration, error) -type DALFunc func(ctx context.Context, all bool) ([]dalmodel.Controller, error) - // Scheduler is a task scheduler for the controller. // // Each job runs in its own goroutine. diff --git a/backend/controller/scheduledtask/scheduledtask_test.go b/backend/controller/scheduledtask/scheduledtask_test.go index cfde3ae78e..6dee91887d 100644 --- a/backend/controller/scheduledtask/scheduledtask_test.go +++ b/backend/controller/scheduledtask/scheduledtask_test.go @@ -10,8 +10,8 @@ import ( "github.com/benbjohnson/clock" "github.com/jpillora/backoff" - dalmodel "github.com/TBD54566975/ftl/backend/controller/dal/model" "github.com/TBD54566975/ftl/backend/controller/leases" + "github.com/TBD54566975/ftl/backend/controller/state" "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/model" ) @@ -26,15 +26,15 @@ func TestScheduledTask(t *testing.T) { var multiCount atomic.Int64 type controller struct { - controller dalmodel.Controller + controller state.Controller cron *Scheduler } controllers := []*controller{ - {controller: dalmodel.Controller{Key: model.NewControllerKey("localhost", "8080")}}, - {controller: dalmodel.Controller{Key: model.NewControllerKey("localhost", "8081")}}, - {controller: dalmodel.Controller{Key: model.NewControllerKey("localhost", "8082")}}, - {controller: dalmodel.Controller{Key: model.NewControllerKey("localhost", "8083")}}, + {controller: state.Controller{Key: model.NewControllerKey("localhost", "8080")}}, + {controller: state.Controller{Key: model.NewControllerKey("localhost", "8081")}}, + {controller: state.Controller{Key: model.NewControllerKey("localhost", "8082")}}, + {controller: state.Controller{Key: model.NewControllerKey("localhost", "8083")}}, } clock := clock.NewMock() diff --git a/backend/controller/state/controller.go b/backend/controller/state/controller.go new file mode 100644 index 0000000000..321b36ef46 --- /dev/null +++ b/backend/controller/state/controller.go @@ -0,0 +1,8 @@ +package state + +import "github.com/TBD54566975/ftl/internal/model" + +type Controller struct { + Endpoint string + Key model.ControllerKey +} diff --git a/sqlc.yaml b/sqlc.yaml index 92e47c9d33..6f5d62d381 100644 --- a/sqlc.yaml +++ b/sqlc.yaml @@ -11,7 +11,7 @@ sql: go: &gengo package: "sql" omit_unused_structs: true - out: "backend/controller/dal/internal/sql" + out: "backend/controller/pubsub/internal/sql" emit_interface: true query_parameter_limit: 3 overrides: