From cc211c1b66d613e10f9d5ef0d59c70b4480d01e3 Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Fri, 31 May 2024 14:04:37 +1000 Subject: [PATCH] feat: add topics, subscriptions & subscribers to db on deployment (#1608) Keys will have the following prefixes: - topic: `top` - subscription: `sub` - subscriber: `subr` --- backend/controller/dal/dal.go | 59 ++++++++++++ backend/controller/dal/notify.go | 2 + backend/controller/sql/models.go | 11 ++- backend/controller/sql/querier.go | 3 + backend/controller/sql/queries.sql | 46 +++++++++ backend/controller/sql/queries.sql.go | 104 +++++++++++++++++++++ backend/controller/sql/schema/001_init.sql | 9 +- internal/model/cron_job_key.go | 13 +-- internal/model/subscriber_key.go | 34 +++++++ internal/model/subscription_key.go | 34 +++++++ internal/model/topic_key.go | 32 +++++++ sqlc.yaml | 6 ++ 12 files changed, 340 insertions(+), 13 deletions(-) create mode 100644 internal/model/subscriber_key.go create mode 100644 internal/model/subscription_key.go create mode 100644 internal/model/topic_key.go diff --git a/backend/controller/dal/dal.go b/backend/controller/dal/dal.go index 7720c6ce42..e6170ee0f2 100644 --- a/backend/controller/dal/dal.go +++ b/backend/controller/dal/dal.go @@ -488,6 +488,41 @@ func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchem return model.DeploymentKey{}, fmt.Errorf("failed to upsert module: %w", translatePGError(err)) } + // upsert topics + for _, decl := range moduleSchema.Decls { + t, ok := decl.(*schema.Topic) + if !ok { + continue + } + err := tx.UpsertTopic(ctx, sql.UpsertTopicParams{ + Topic: model.NewTopicKey(moduleSchema.Name, t.Name), + Module: moduleSchema.Name, + Name: t.Name, + EventType: t.Event.String(), + }) + if err != nil { + return model.DeploymentKey{}, fmt.Errorf("could not insert topic: %w", translatePGError(err)) + } + } + + // upsert subscriptions + for _, decl := range moduleSchema.Decls { + s, ok := decl.(*schema.Subscription) + if !ok { + continue + } + err := tx.UpsertSubscription(ctx, sql.UpsertSubscriptionParams{ + Key: model.NewSubscriptionKey(moduleSchema.Name, s.Name), + Module: moduleSchema.Name, + TopicModule: s.Topic.Module, + TopicName: s.Topic.Name, + Name: s.Name, + }) + if err != nil { + return model.DeploymentKey{}, fmt.Errorf("could not insert subscription: %w", translatePGError(err)) + } + } + deploymentKey := model.NewDeploymentKey(moduleSchema.Name) // Create the deployment @@ -550,6 +585,30 @@ func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchem } } + // create subscribers + for _, decl := range moduleSchema.Decls { + v, ok := decl.(*schema.Verb) + if !ok { + continue + } + for _, md := range v.Metadata { + s, ok := md.(*schema.MetadataSubscriber) + if !ok { + continue + } + err := tx.InsertSubscriber(ctx, sql.InsertSubscriberParams{ + Key: model.NewSubscriberKey(moduleSchema.Name, s.Name, v.Name), + Module: moduleSchema.Name, + SubscriptionName: s.Name, + Deployment: deploymentKey, + Sink: v.Name, + }) + if err != nil { + return model.DeploymentKey{}, fmt.Errorf("could not insert subscriber: %w", translatePGError(err)) + } + } + } + return deploymentKey, nil } diff --git a/backend/controller/dal/notify.go b/backend/controller/dal/notify.go index 0bf5d1279a..d08ad9fa2b 100644 --- a/backend/controller/dal/notify.go +++ b/backend/controller/dal/notify.go @@ -119,6 +119,8 @@ func (d *DAL) publishNotification(ctx context.Context, notification event, logge logger.Tracef("Deployment notification: %s", deployment) d.DeploymentChanges.Publish(deployment) + case "topics": + // TODO: handle topics notifications default: panic(fmt.Sprintf("unknown table %q in DB notification", notification.Table)) } diff --git a/backend/controller/sql/models.go b/backend/controller/sql/models.go index b9f960a272..d378f9f88e 100644 --- a/backend/controller/sql/models.go +++ b/backend/controller/sql/models.go @@ -457,7 +457,7 @@ type Runner struct { type Topic struct { ID int64 - Key interface{} + Key model.TopicKey CreatedAt time.Time ModuleID int64 Name string @@ -473,18 +473,19 @@ type TopicEvent struct { type TopicSubscriber struct { ID int64 - Key interface{} + Key model.SubscriberKey CreatedAt time.Time TopicSubscriptionsID int64 DeploymentID int64 - Verb string + Sink string } type TopicSubscription struct { ID int64 - Key interface{} + Key model.SubscriptionKey CreatedAt time.Time TopicID int64 + ModuleID int64 Name string - Cursor int64 + Cursor optional.Option[int64] } diff --git a/backend/controller/sql/querier.go b/backend/controller/sql/querier.go index b8647b90e4..534a9fd0a4 100644 --- a/backend/controller/sql/querier.go +++ b/backend/controller/sql/querier.go @@ -74,6 +74,7 @@ type Querier interface { InsertDeploymentUpdatedEvent(ctx context.Context, arg InsertDeploymentUpdatedEventParams) error InsertEvent(ctx context.Context, arg InsertEventParams) error InsertLogEvent(ctx context.Context, arg InsertLogEventParams) error + InsertSubscriber(ctx context.Context, arg InsertSubscriberParams) error // Mark any controller entries that haven't been updated recently as dead. KillStaleControllers(ctx context.Context, timeout time.Duration) (int64, error) KillStaleRunners(ctx context.Context, timeout time.Duration) (int64, error) @@ -103,6 +104,8 @@ type Querier interface { // there is no corresponding deployment, then the deployment ID is -1 // and the parent statement will fail due to a foreign key constraint. UpsertRunner(ctx context.Context, arg UpsertRunnerParams) (optional.Option[int64], error) + UpsertSubscription(ctx context.Context, arg UpsertSubscriptionParams) error + UpsertTopic(ctx context.Context, arg UpsertTopicParams) error } var _ Querier = (*Queries)(nil) diff --git a/backend/controller/sql/queries.sql b/backend/controller/sql/queries.sql index 6f7fc73c44..557994445f 100644 --- a/backend/controller/sql/queries.sql +++ b/backend/controller/sql/queries.sql @@ -613,6 +613,52 @@ WHERE fsm = @fsm::schema_ref AND key = @key::TEXT RETURNING true; +-- name: UpsertTopic :exec +INSERT INTO topics (key, module_id, name, type) +VALUES ( + sqlc.arg('topic')::topic_key, + (SELECT id FROM modules WHERE name = sqlc.arg('module')::TEXT LIMIT 1), + sqlc.arg('name')::TEXT, + sqlc.arg('event_type')::TEXT +) +ON CONFLICT (name, module_id) DO +UPDATE SET + type = sqlc.arg('event_type')::TEXT +RETURNING id; + +-- name: UpsertSubscription :exec +INSERT INTO topic_subscriptions (key, topic_id, module_id, name) +VALUES ( + sqlc.arg('key')::subscription_key, + ( + SELECT topics.id as id + FROM topics + INNER JOIN modules ON topics.module_id = modules.id + WHERE modules.name = sqlc.arg('topic_module')::TEXT + AND topics.name = sqlc.arg('topic_name')::TEXT + ), + (SELECT id FROM modules WHERE name = sqlc.arg('module')::TEXT), + sqlc.arg('name')::TEXT +) +ON CONFLICT (name, module_id) DO +UPDATE SET + topic_id = excluded.topic_id +RETURNING id; + +-- name: InsertSubscriber :exec +INSERT INTO topic_subscribers (key, topic_subscriptions_id, deployment_id, sink) +VALUES ( + sqlc.arg('key')::subscriber_key, + ( + SELECT topic_subscriptions.id as id + FROM topic_subscriptions + INNER JOIN modules ON topic_subscriptions.module_id = modules.id + WHERE modules.name = sqlc.arg('module')::TEXT + AND topic_subscriptions.name = sqlc.arg('subscription_name')::TEXT + ), + (SELECT id FROM deployments WHERE key = sqlc.arg('deployment')::deployment_key), + sqlc.arg('sink')::TEXT); + -- name: GetModuleConfiguration :one SELECT value FROM module_configuration diff --git a/backend/controller/sql/queries.sql.go b/backend/controller/sql/queries.sql.go index 1343228d47..3152c20c6a 100644 --- a/backend/controller/sql/queries.sql.go +++ b/backend/controller/sql/queries.sql.go @@ -1633,6 +1633,40 @@ func (q *Queries) InsertLogEvent(ctx context.Context, arg InsertLogEventParams) return err } +const insertSubscriber = `-- name: InsertSubscriber :exec +INSERT INTO topic_subscribers (key, topic_subscriptions_id, deployment_id, sink) +VALUES ( + $1::subscriber_key, + ( + SELECT topic_subscriptions.id as id + FROM topic_subscriptions + INNER JOIN modules ON topic_subscriptions.module_id = modules.id + WHERE modules.name = $2::TEXT + AND topic_subscriptions.name = $3::TEXT + ), + (SELECT id FROM deployments WHERE key = $4::deployment_key), + $5::TEXT) +` + +type InsertSubscriberParams struct { + Key model.SubscriberKey + Module string + SubscriptionName string + Deployment model.DeploymentKey + Sink string +} + +func (q *Queries) InsertSubscriber(ctx context.Context, arg InsertSubscriberParams) error { + _, err := q.db.Exec(ctx, insertSubscriber, + arg.Key, + arg.Module, + arg.SubscriptionName, + arg.Deployment, + arg.Sink, + ) + return err +} + const killStaleControllers = `-- name: KillStaleControllers :one WITH matches AS ( UPDATE controller @@ -2091,3 +2125,73 @@ func (q *Queries) UpsertRunner(ctx context.Context, arg UpsertRunnerParams) (opt err := row.Scan(&deployment_id) return deployment_id, err } + +const upsertSubscription = `-- name: UpsertSubscription :exec +INSERT INTO topic_subscriptions (key, topic_id, module_id, name) +VALUES ( + $1::subscription_key, + ( + SELECT topics.id as id + FROM topics + INNER JOIN modules ON topics.module_id = modules.id + WHERE modules.name = $2::TEXT + AND topics.name = $3::TEXT + ), + (SELECT id FROM modules WHERE name = $4::TEXT), + $5::TEXT +) +ON CONFLICT (name, module_id) DO +UPDATE SET + topic_id = excluded.topic_id +RETURNING id +` + +type UpsertSubscriptionParams struct { + Key model.SubscriptionKey + TopicModule string + TopicName string + Module string + Name string +} + +func (q *Queries) UpsertSubscription(ctx context.Context, arg UpsertSubscriptionParams) error { + _, err := q.db.Exec(ctx, upsertSubscription, + arg.Key, + arg.TopicModule, + arg.TopicName, + arg.Module, + arg.Name, + ) + return err +} + +const upsertTopic = `-- name: UpsertTopic :exec +INSERT INTO topics (key, module_id, name, type) +VALUES ( + $1::topic_key, + (SELECT id FROM modules WHERE name = $2::TEXT LIMIT 1), + $3::TEXT, + $4::TEXT +) +ON CONFLICT (name, module_id) DO +UPDATE SET + type = $4::TEXT +RETURNING id +` + +type UpsertTopicParams struct { + Topic model.TopicKey + Module string + Name string + EventType string +} + +func (q *Queries) UpsertTopic(ctx context.Context, arg UpsertTopicParams) error { + _, err := q.db.Exec(ctx, upsertTopic, + arg.Topic, + arg.Module, + arg.Name, + arg.EventType, + ) + return err +} diff --git a/backend/controller/sql/schema/001_init.sql b/backend/controller/sql/schema/001_init.sql index 6203dce488..e48a04119a 100644 --- a/backend/controller/sql/schema/001_init.sql +++ b/backend/controller/sql/schema/001_init.sql @@ -352,13 +352,18 @@ CREATE TABLE topic_subscriptions ( topic_id BIGINT NOT NULL REFERENCES topics(id) ON DELETE CASCADE, + -- Each subscription is associated with an owning module. + module_id BIGINT NOT NULL REFERENCES modules(id), + -- Name of the subscription. name TEXT UNIQUE NOT NULL, -- Cursor pointing into the topic_events table. - cursor BIGINT NOT NULL REFERENCES topic_events(id) ON DELETE CASCADE + cursor BIGINT REFERENCES topic_events(id) ON DELETE CASCADE ); +CREATE UNIQUE INDEX topic_subscriptions_module_name_idx ON topic_subscriptions(module_id, name); + CREATE DOMAIN subscriber_key AS TEXT; -- A subscriber to a topic. @@ -373,7 +378,7 @@ CREATE TABLE topic_subscribers ( deployment_id BIGINT NOT NULL REFERENCES deployments(id) ON DELETE CASCADE, -- Name of the verb to call on the deployment. - verb TEXT NOT NULL + sink TEXT NOT NULL ); CREATE DOMAIN lease_key AS TEXT; diff --git a/internal/model/cron_job_key.go b/internal/model/cron_job_key.go index fea4d2984d..5150c3926e 100644 --- a/internal/model/cron_job_key.go +++ b/internal/model/cron_job_key.go @@ -2,30 +2,31 @@ package model import ( "errors" - "strings" ) type CronJobKey = KeyType[CronJobPayload, *CronJobPayload] func NewCronJobKey(module, verb string) CronJobKey { - return newKey[CronJobPayload](strings.Join([]string{module, verb}, "-")) + return newKey[CronJobPayload](module, verb) } func ParseCronJobKey(key string) (CronJobKey, error) { return parseKey[CronJobPayload](key) } type CronJobPayload struct { - Ref string + Module string + Verb string } var _ KeyPayload = (*CronJobPayload)(nil) func (d *CronJobPayload) Kind() string { return "crn" } -func (d *CronJobPayload) String() string { return d.Ref } +func (d *CronJobPayload) String() string { return d.Module + "-" + d.Verb } func (d *CronJobPayload) Parse(parts []string) error { - if len(parts) == 0 { + if len(parts) != 2 { return errors.New("expected - but got empty string") } - d.Ref = strings.Join(parts, "-") + d.Module = parts[0] + d.Verb = parts[1] return nil } func (d *CronJobPayload) RandomBytes() int { return 10 } diff --git a/internal/model/subscriber_key.go b/internal/model/subscriber_key.go new file mode 100644 index 0000000000..24aed8595d --- /dev/null +++ b/internal/model/subscriber_key.go @@ -0,0 +1,34 @@ +package model + +import ( + "errors" +) + +type SubscriberKey = KeyType[SubscriberPayload, *SubscriberPayload] + +func NewSubscriberKey(module, subscription, sink string) SubscriberKey { + return newKey[SubscriberPayload](module, subscription) +} + +func ParseSubscriberKey(key string) (SubscriberKey, error) { + return parseKey[SubscriberPayload](key) +} + +type SubscriberPayload struct { + Module string + Subscription string +} + +var _ KeyPayload = (*SubscriberPayload)(nil) + +func (s *SubscriberPayload) Kind() string { return "subr" } +func (s *SubscriberPayload) String() string { return s.Module + "-" + s.Subscription } +func (s *SubscriberPayload) Parse(parts []string) error { + if len(parts) != 2 { + return errors.New("expected - but got empty string") + } + s.Module = parts[0] + s.Subscription = parts[1] + return nil +} +func (s *SubscriberPayload) RandomBytes() int { return 10 } diff --git a/internal/model/subscription_key.go b/internal/model/subscription_key.go new file mode 100644 index 0000000000..75f58bc9c2 --- /dev/null +++ b/internal/model/subscription_key.go @@ -0,0 +1,34 @@ +package model + +import ( + "errors" +) + +type SubscriptionKey = KeyType[SubscriptionPayload, *SubscriptionPayload] + +func NewSubscriptionKey(module, name string) SubscriptionKey { + return newKey[SubscriptionPayload](module, name) +} + +func ParseSubscriptionKey(key string) (SubscriptionKey, error) { + return parseKey[SubscriptionPayload](key) +} + +type SubscriptionPayload struct { + Module string + Name string +} + +var _ KeyPayload = (*SubscriptionPayload)(nil) + +func (s *SubscriptionPayload) Kind() string { return "sub" } +func (s *SubscriptionPayload) String() string { return s.Module + "-" + s.Name } +func (s *SubscriptionPayload) Parse(parts []string) error { + if len(parts) != 2 { + return errors.New("expected - but got empty string") + } + s.Module = parts[0] + s.Name = parts[1] + return nil +} +func (s *SubscriptionPayload) RandomBytes() int { return 10 } diff --git a/internal/model/topic_key.go b/internal/model/topic_key.go new file mode 100644 index 0000000000..cc9c8f366f --- /dev/null +++ b/internal/model/topic_key.go @@ -0,0 +1,32 @@ +package model + +import ( + "errors" +) + +type TopicKey = KeyType[TopicPayload, *TopicPayload] + +func NewTopicKey(module, name string) TopicKey { + return newKey[TopicPayload](module, name) +} + +func ParseTopicKey(key string) (TopicKey, error) { return parseKey[TopicPayload](key) } + +type TopicPayload struct { + Module string + Name string +} + +var _ KeyPayload = (*TopicPayload)(nil) + +func (t *TopicPayload) Kind() string { return "top" } +func (t *TopicPayload) String() string { return t.Module + "-" + t.Name } +func (t *TopicPayload) Parse(parts []string) error { + if len(parts) != 2 { + return errors.New("expected - but got empty string") + } + t.Module = parts[0] + t.Name = parts[1] + return nil +} +func (t *TopicPayload) RandomBytes() int { return 10 } diff --git a/sqlc.yaml b/sqlc.yaml index 0ecd602b31..3e7f4cb135 100644 --- a/sqlc.yaml +++ b/sqlc.yaml @@ -109,6 +109,12 @@ sql: go_type: "github.com/TBD54566975/ftl/internal/model.DeploymentKey" - column: "events.payload" go_type: "encoding/json.RawMessage" + - db_type: "topic_key" + go_type: "github.com/TBD54566975/ftl/internal/model.TopicKey" + - db_type: "subscription_key" + go_type: "github.com/TBD54566975/ftl/internal/model.SubscriptionKey" + - db_type: "subscriber_key" + go_type: "github.com/TBD54566975/ftl/internal/model.SubscriberKey" rules: - sqlc/db-prepare # - postgresql-query-too-costly