From c9a8561e22a2509719ad987106e26fe5a07adfde Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Thu, 6 Jun 2024 13:16:39 +1000 Subject: [PATCH] feat: pubsub manager (#1673) closes https://github.com/TBD54566975/ftl/issues/1596 closes https://github.com/TBD54566975/ftl/issues/1642 How it works: - pubsub managers poll for subscriptions that are behind the topic's head - locks these subscriptions for the tx - skips subscriptions that are already locked - Then finds the next event for each subscription - If the event is newer than our artificial delay, we skip this subscription on this attempt Pubsub also speeds things up by: - Polling once a second - When it schedules an async call, notifies controller so we can attempt to pick it up straight away - this is wrapped in a mutex so that we don't accidentally get more and more async calls running at once on a single controller - When the async call is completed, pubsub is notified so that it can immediately try progressing subscriptions again --- backend/controller/controller.go | 34 +++- backend/controller/dal/pubsub.go | 76 +++++--- backend/controller/pubsub/integration_test.go | 56 +++++- backend/controller/pubsub/manager.go | 171 +++++------------- .../pubsub/testdata/go/publisher/publisher.go | 30 ++- .../testdata/go/subscriber/subscriber.go | 2 +- backend/controller/sql/querier.go | 8 +- backend/controller/sql/queries.sql | 47 +++-- backend/controller/sql/queries.sql.go | 81 ++++----- backend/controller/sql/schema/001_init.sql | 11 +- integration/actions.go | 11 +- 11 files changed, 281 insertions(+), 246 deletions(-) diff --git a/backend/controller/controller.go b/backend/controller/controller.go index 14f5ec7c5f..79443941c9 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -198,6 +198,7 @@ type Service struct { runnerScaling scaling.RunnerScaling increaseReplicaFailures map[string]int + asyncCallsLock sync.Mutex } func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling.RunnerScaling) (*Service, error) { @@ -231,9 +232,8 @@ func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling. svc.cronJobs = cronSvc svc.controllerListListeners = append(svc.controllerListListeners, cronSvc) - pubSub := pubsub.New(ctx, key, db) + pubSub := pubsub.New(ctx, db, svc.tasks, svc) svc.pubSub = pubSub - svc.controllerListListeners = append(svc.controllerListListeners, pubSub) go svc.syncSchema(ctx) @@ -1198,7 +1198,22 @@ func (s *Service) reconcileRunners(ctx context.Context) (time.Duration, error) { return time.Second, nil } +// AsyncCallWasAdded is an optional notification that an async call was added by this controller +// +// It allows us to speed up execution of scheduled async calls rather than waiting for the next poll time. +func (s *Service) AsyncCallWasAdded(ctx context.Context) { + go func() { + if _, err := s.executeAsyncCalls(ctx); err != nil { + log.FromContext(ctx).Errorf(err, "failed to progress subscriptions") + } + }() +} + func (s *Service) executeAsyncCalls(ctx context.Context) (time.Duration, error) { + // There are multiple entry points into this function, but we want the controller to handle async calls one at a time. + s.asyncCallsLock.Lock() + defer s.asyncCallsLock.Unlock() + logger := log.FromContext(ctx) logger.Tracef("Acquiring async call") @@ -1238,6 +1253,7 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (time.Duration, error) // Will retry, do not propagate failure yet. return nil } + // Allow for handling of completion based on origin switch origin := call.Origin.(type) { case dal.AsyncOriginFSM: return s.onAsyncFSMCallCompletion(ctx, tx, origin, failed) @@ -1252,6 +1268,20 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (time.Duration, error) if err != nil { return 0, fmt.Errorf("failed to complete async call: %w", err) } + go func() { + // Post-commit notification based on origin + switch origin := call.Origin.(type) { + case dal.AsyncOriginFSM: + break + + case dal.AsyncOriginPubSub: + s.pubSub.AsyncCallDidCommit(ctx, origin) + + default: + break + } + }() + return 0, nil } diff --git a/backend/controller/dal/pubsub.go b/backend/controller/dal/pubsub.go index 8ca234b7da..d26f08bd4c 100644 --- a/backend/controller/dal/pubsub.go +++ b/backend/controller/dal/pubsub.go @@ -3,12 +3,13 @@ package dal import ( "context" "fmt" + "time" "github.com/TBD54566975/ftl/backend/controller/sql" "github.com/TBD54566975/ftl/backend/schema" + "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/model" "github.com/TBD54566975/ftl/internal/slices" - "github.com/alecthomas/types/optional" ) func (d *DAL) PublishEventForTopic(ctx context.Context, module, topic string, payload []byte) error { @@ -39,43 +40,64 @@ func (d *DAL) GetSubscriptionsNeedingUpdate(ctx context.Context) ([]model.Subscr }), nil } -func (d *DAL) ProgressSubscription(ctx context.Context, subscription model.Subscription) error { +func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay time.Duration) (count int, err error) { tx, err := d.Begin(ctx) if err != nil { - return fmt.Errorf("failed to begin transaction: %w", err) + return 0, fmt.Errorf("failed to begin transaction: %w", err) } defer tx.CommitOrRollback(ctx, &err) - nextCursor, err := tx.db.GetNextEventForSubscription(ctx, subscription.Topic, subscription.Cursor) - if err != nil { - return fmt.Errorf("failed to get next cursor: %w", translatePGError(err)) - } + logger := log.FromContext(ctx) - result, err := tx.db.LockSubscriptionAndGetSink(ctx, subscription.Key, subscription.Cursor) + // get subscriptions needing update + // also gets a lock on the subscription, and skips any subscriptions locked by others + subs, err := tx.db.GetSubscriptionsNeedingUpdate(ctx) if err != nil { - return fmt.Errorf("failed to get lock on subscription: %w", translatePGError(err)) + return 0, fmt.Errorf("could not get subscriptions to progress: %w", translatePGError(err)) } - err = tx.db.BeginConsumingTopicEvent(ctx, optional.Some(result.SubscriptionID), nextCursor.Event) - if err != nil { - return fmt.Errorf("failed to progress subscription: %w", translatePGError(err)) - } + successful := 0 + for _, subscription := range subs { + nextCursor, err := tx.db.GetNextEventForSubscription(ctx, subscription.Topic, subscription.Cursor) + if err != nil { + return 0, fmt.Errorf("failed to get next cursor: %w", translatePGError(err)) + } + nextCursorKey, ok := nextCursor.Event.Get() + if !ok { + return 0, fmt.Errorf("could not find event to progress subscription: %w", translatePGError(err)) + } + if nextCreatedAt, ok := nextCursor.CreatedAt.Get(); ok && nextCreatedAt.Add(eventConsumptionDelay).After(time.Now()) { + logger.Tracef("Skipping subscription %s because event is too new", subscription.Key) + continue + } - origin := AsyncOriginPubSub{ - Subscription: schema.RefKey{ - Module: subscription.Key.Payload.Module, - Name: subscription.Key.Payload.Name, - }, - } - _, err = tx.db.CreateAsyncCall(ctx, sql.CreateAsyncCallParams{ - Verb: result.Sink, - Origin: origin.String(), - Request: nextCursor.Payload, - }) - if err != nil { - return fmt.Errorf("failed to schedule async task for subscription: %w", translatePGError(err)) + sink, err := tx.db.GetRandomSubscriberSink(ctx, subscription.Key) + if err != nil { + return 0, fmt.Errorf("failed to get lock on subscription: %w", translatePGError(err)) + } + + err = tx.db.BeginConsumingTopicEvent(ctx, subscription.Key, nextCursorKey) + if err != nil { + return 0, fmt.Errorf("failed to progress subscription: %w", translatePGError(err)) + } + + origin := AsyncOriginPubSub{ + Subscription: schema.RefKey{ + Module: subscription.Key.Payload.Module, + Name: subscription.Key.Payload.Name, + }, + } + _, err = tx.db.CreateAsyncCall(ctx, sql.CreateAsyncCallParams{ + Verb: sink, + Origin: origin.String(), + Request: nextCursor.Payload, + }) + if err != nil { + return 0, fmt.Errorf("failed to schedule async task for subscription: %w", translatePGError(err)) + } + successful++ } - return nil + return successful, nil } func (d *DAL) CompleteEventForSubscription(ctx context.Context, module, name string) error { diff --git a/backend/controller/pubsub/integration_test.go b/backend/controller/pubsub/integration_test.go index 09ee4cbac6..c9e462483c 100644 --- a/backend/controller/pubsub/integration_test.go +++ b/backend/controller/pubsub/integration_test.go @@ -13,20 +13,20 @@ import ( ) func TestPubSub(t *testing.T) { + calls := 20 + events := calls * 10 in.Run(t, "", in.CopyModule("publisher"), in.CopyModule("subscriber"), in.Deploy("publisher"), in.Deploy("subscriber"), - // publish 2 events - in.Call("publisher", "publish", in.Obj{}, func(t testing.TB, resp in.Obj) {}), - in.Call("publisher", "publish", in.Obj{}, func(t testing.TB, resp in.Obj) {}), + // publish events + in.Repeat(calls, in.Call("publisher", "publishTen", in.Obj{}, func(t testing.TB, resp in.Obj) {})), - // TODO: speed this up when we have proper pubsub implementation - in.Sleep(time.Second*4*2), + in.Sleep(time.Second*4), - // check that there are 2 successful async calls + // check that there are the right amount of successful async calls in.QueryRow("ftl", fmt.Sprintf(` SELECT COUNT(*) @@ -35,6 +35,48 @@ func TestPubSub(t *testing.T) { state = 'success' AND origin = '%s' `, dal.AsyncOriginPubSub{Subscription: schema.RefKey{Module: "subscriber", Name: "test_subscription"}}.String()), - 2), + events), + ) +} + +func TestPubSubConsumptionDelay(t *testing.T) { + in.Run(t, "", + in.CopyModule("publisher"), + in.CopyModule("subscriber"), + in.Deploy("publisher"), + in.Deploy("subscriber"), + + // publish events with a small delay between each + // pubsub should trigger its poll a few times during this period + // each time it should continue processing each event until it reaches one that is too new to process + func(t testing.TB, ic in.TestContext) { + for i := 0; i < 60; i++ { + in.Call("publisher", "publishOne", in.Obj{}, func(t testing.TB, resp in.Obj) {})(t, ic) + time.Sleep(time.Millisecond * 50) + } + }, + + in.Sleep(time.Second*2), + + // Get all event created ats, and all async call created ats + // Compare each, make sure none are less than 0.2s of each other + in.QueryRow("ftl", ` + WITH event_times AS ( + SELECT created_at, ROW_NUMBER() OVER (ORDER BY created_at) AS row_num + FROM ( + select * from topic_events order by created_at + ) + ), + async_call_times AS ( + SELECT created_at, ROW_NUMBER() OVER (ORDER BY created_at) AS row_num + FROM ( + select * from async_calls ac order by created_at + ) + ) + SELECT COUNT(*) + FROM event_times + JOIN async_call_times ON event_times.row_num = async_call_times.row_num + WHERE ABS(EXTRACT(EPOCH FROM (event_times.created_at - async_call_times.created_at))) < 0.2; + `, 0), ) } diff --git a/backend/controller/pubsub/manager.go b/backend/controller/pubsub/manager.go index 4b2b441342..cbf0af2008 100644 --- a/backend/controller/pubsub/manager.go +++ b/backend/controller/pubsub/manager.go @@ -2,161 +2,76 @@ package pubsub import ( "context" - "fmt" "time" "github.com/TBD54566975/ftl/backend/controller/dal" + "github.com/TBD54566975/ftl/backend/controller/scheduledtask" "github.com/TBD54566975/ftl/internal/log" - "github.com/TBD54566975/ftl/internal/model" - "github.com/TBD54566975/ftl/internal/slices" - "github.com/alecthomas/atomic" - "github.com/serialx/hashring" + "github.com/jpillora/backoff" ) const ( - controllersPerSubscription = 2 + // Events can be added simultaneously, which can cause events with out of order create_at values + // By adding a delay, we ensure that by the time we read the events, no new events will be added + // with earlier created_at values. + eventConsumptionDelay = 200 * time.Millisecond ) type DAL interface { - GetSubscriptionsNeedingUpdate(ctx context.Context) ([]model.Subscription, error) - ProgressSubscription(ctx context.Context, subscription model.Subscription) error + ProgressSubscriptions(ctx context.Context, eventConsumptionDelay time.Duration) (count int, err error) CompleteEventForSubscription(ctx context.Context, module, name string) error } -type Manager struct { - key model.ControllerKey - dal DAL - hashRingState atomic.Value[*hashRingState] -} - -type hashRingState struct { - hashRing *hashring.HashRing - controllers []dal.Controller - idx int -} - -func New(ctx context.Context, key model.ControllerKey, dal *dal.DAL) *Manager { - m := &Manager{ - key: key, - dal: dal, - } - - go m.watchForUpdates(ctx) - return m +type Scheduler interface { + Singleton(retry backoff.Backoff, job scheduledtask.Job) + Parallel(retry backoff.Backoff, job scheduledtask.Job) } -func (m *Manager) HandleTopicNotification() { - -} - -func (m *Manager) HandleEventNotification() { - -} - -// UpdatedControllerList synchronises the hash ring with the active controllers. -func (m *Manager) UpdatedControllerList(ctx context.Context, controllers []dal.Controller) { - logger := log.FromContext(ctx).Scope("cron") - controllerIdx := -1 - for idx, controller := range controllers { - if controller.Key.String() == m.key.String() { - controllerIdx = idx - break - } - } - if controllerIdx == -1 { - logger.Tracef("controller %q not found in list of controllers", m.key) - } - - oldState := m.hashRingState.Load() - if oldState != nil && len(oldState.controllers) == len(controllers) { - hasChanged := false - for idx, new := range controllers { - old := oldState.controllers[idx] - if new.Key.String() != old.Key.String() { - hasChanged = true - break - } - } - if !hasChanged { - return - } - } - - hashRing := hashring.New(slices.Map(controllers, func(c dal.Controller) string { return c.Key.String() })) - m.hashRingState.Store(&hashRingState{ - hashRing: hashRing, - controllers: controllers, - idx: controllerIdx, - }) +type AsyncCallListener interface { + AsyncCallWasAdded(ctx context.Context) } -// isResponsibleForSubscription indicates whether a this service should be responsible for attempting jobs, -// or if enough other controllers will handle it. This allows us to spread the job load across controllers. -func (m *Manager) isResponsibleForSubscription(subscription model.Subscription) bool { - hashringState := m.hashRingState.Load() - if hashringState == nil { - return true - } - - initialKey, ok := hashringState.hashRing.GetNode(subscription.Key.String()) - if !ok { - return true - } - - initialIdx := -1 - for idx, controller := range hashringState.controllers { - if controller.Key.String() == initialKey { - initialIdx = idx - break - } - } - if initialIdx == -1 { - return true - } - - if initialIdx+controllersPerSubscription > len(hashringState.controllers) { - // wraps around - return hashringState.idx >= initialIdx || hashringState.idx < (initialIdx+controllersPerSubscription)-len(hashringState.controllers) - } - return hashringState.idx >= initialIdx && hashringState.idx < initialIdx+controllersPerSubscription +type Manager struct { + dal DAL + scheduler Scheduler + asyncCallListener AsyncCallListener } -func (m *Manager) watchForUpdates(ctx context.Context) { - logger := log.FromContext(ctx).Scope("pubsub") - - // TODO: handle events here. Currently a demo implementation - for { - select { - case <-ctx.Done(): - return - case <-time.After(time.Second * 3): - if err := m.progressSubscriptions(ctx); err != nil { - logger.Errorf(err, "failed to progress subscriptions") - continue - } - } +func New(ctx context.Context, dal *dal.DAL, scheduler Scheduler, asyncCallListener AsyncCallListener) *Manager { + m := &Manager{ + dal: dal, + scheduler: scheduler, + asyncCallListener: asyncCallListener, } + m.scheduler.Parallel(backoff.Backoff{ + Min: 1 * time.Second, + Max: 5 * time.Second, + Jitter: true, + Factor: 1.5, + }, m.progressSubscriptions) + return m } -func (m *Manager) progressSubscriptions(ctx context.Context) (err error) { - subscriptions, err := m.dal.GetSubscriptionsNeedingUpdate(ctx) +func (m *Manager) progressSubscriptions(ctx context.Context) (time.Duration, error) { + count, err := m.dal.ProgressSubscriptions(ctx, eventConsumptionDelay) if err != nil { - return fmt.Errorf("failed to get subscriptions needing update: %w", err) + return 0, err } - for _, subscription := range subscriptions { - if !m.isResponsibleForSubscription(subscription) { - continue - } - logger := log.FromContext(ctx) - - err := m.dal.ProgressSubscription(ctx, subscription) - if err != nil { - logger.Errorf(err, "failed to progress subscription") - } + if count > 0 { + // notify controller that we added an async call + m.asyncCallListener.AsyncCallWasAdded(ctx) } - return nil + return time.Second, err } +// OnCallCompletion is called within a transaction after an async call has completed to allow the subscription state to be updated. func (m *Manager) OnCallCompletion(ctx context.Context, tx *dal.Tx, origin dal.AsyncOriginPubSub, failed bool) error { return m.dal.CompleteEventForSubscription(ctx, origin.Subscription.Module, origin.Subscription.Name) } + +// AsyncCallDidCommit is called after an subscription's async call has been completed and committed to the database. +func (m *Manager) AsyncCallDidCommit(ctx context.Context, origin dal.AsyncOriginPubSub) { + if _, err := m.progressSubscriptions(ctx); err != nil { + log.FromContext(ctx).Errorf(err, "failed to progress subscriptions") + } +} diff --git a/backend/controller/pubsub/testdata/go/publisher/publisher.go b/backend/controller/pubsub/testdata/go/publisher/publisher.go index b390078493..1f03cb5a38 100644 --- a/backend/controller/pubsub/testdata/go/publisher/publisher.go +++ b/backend/controller/pubsub/testdata/go/publisher/publisher.go @@ -16,9 +16,31 @@ type PubSubEvent struct { } //ftl:verb -func Publish(ctx context.Context) error { +func PublishTen(ctx context.Context) error { logger := ftl.LoggerFromContext(ctx) - t := time.Now() - logger.Infof("Publishing %v", t) - return topic.Publish(ctx, PubSubEvent{Time: t}) + for i := 0; i < 10; i++ { + t := time.Now() + logger.Infof("Publishing %v", t) + err := topic.Publish(ctx, PubSubEvent{Time: t}) + if err != nil { + return err + } + time.Sleep(time.Microsecond * 20) + } + return nil +} + +//ftl:verb +func PublishOne(ctx context.Context) error { + logger := ftl.LoggerFromContext(ctx) + for i := 0; i < 10; i++ { + t := time.Now() + logger.Infof("Publishing %v", t) + err := topic.Publish(ctx, PubSubEvent{Time: t}) + if err != nil { + return err + } + time.Sleep(time.Microsecond * 20) + } + return nil } diff --git a/backend/controller/pubsub/testdata/go/subscriber/subscriber.go b/backend/controller/pubsub/testdata/go/subscriber/subscriber.go index f89ba53681..da1a72d904 100644 --- a/backend/controller/pubsub/testdata/go/subscriber/subscriber.go +++ b/backend/controller/pubsub/testdata/go/subscriber/subscriber.go @@ -11,7 +11,7 @@ var _ = ftl.Subscription(publisher.Test_topic, "test_subscription") //ftl:verb //ftl:subscribe test_subscription -func Echo(ctx context.Context, req publisher.PubSubEvent) error { +func Consume(ctx context.Context, req publisher.PubSubEvent) error { logger := ftl.LoggerFromContext(ctx) logger.Infof("Subscriber is processing %v", req.Time) return nil diff --git a/backend/controller/sql/querier.go b/backend/controller/sql/querier.go index 166c0940cc..bd882760be 100644 --- a/backend/controller/sql/querier.go +++ b/backend/controller/sql/querier.go @@ -20,7 +20,7 @@ type Querier interface { // reservation key. AcquireAsyncCall(ctx context.Context, ttl time.Duration) (AcquireAsyncCallRow, error) AssociateArtefactWithDeployment(ctx context.Context, arg AssociateArtefactWithDeploymentParams) error - BeginConsumingTopicEvent(ctx context.Context, subscriptionID optional.Option[int64], event optional.Option[model.TopicEventKey]) error + BeginConsumingTopicEvent(ctx context.Context, subscription model.SubscriptionKey, event model.TopicEventKey) error CompleteEventForSubscription(ctx context.Context, name string, module string) error // Create a new artefact and return the artefact ID. CreateArtefact(ctx context.Context, digest []byte, content []byte) (int64, error) @@ -65,6 +65,7 @@ type Querier interface { GetModulesByID(ctx context.Context, ids []int64) ([]Module, error) GetNextEventForSubscription(ctx context.Context, topic model.TopicKey, cursor optional.Option[model.TopicEventKey]) (GetNextEventForSubscriptionRow, error) GetProcessList(ctx context.Context) ([]GetProcessListRow, error) + GetRandomSubscriberSink(ctx context.Context, key model.SubscriptionKey) (schema.RefKey, error) // Retrieve routing information for a runner. GetRouteForRunner(ctx context.Context, key model.RunnerKey) (GetRouteForRunnerRow, error) GetRoutingTable(ctx context.Context, modules []string) ([]GetRoutingTableRow, error) @@ -72,6 +73,9 @@ type Querier interface { GetRunnerState(ctx context.Context, key model.RunnerKey) (RunnerState, error) GetRunnersForDeployment(ctx context.Context, key model.DeploymentKey) ([]GetRunnersForDeploymentRow, error) GetStaleCronJobs(ctx context.Context, dollar_1 time.Duration) ([]GetStaleCronJobsRow, error) + // Results may not be ready to be scheduled yet due to event consumption delay + // Sorting ensures that brand new events (that may not be ready for consumption) + // don't prevent older events from being consumed GetSubscriptionsNeedingUpdate(ctx context.Context) ([]GetSubscriptionsNeedingUpdateRow, error) InsertCallEvent(ctx context.Context, arg InsertCallEventParams) error InsertDeploymentCreatedEvent(ctx context.Context, arg InsertDeploymentCreatedEventParams) error @@ -84,8 +88,6 @@ type Querier interface { KillStaleRunners(ctx context.Context, timeout time.Duration) (int64, error) ListModuleConfiguration(ctx context.Context) ([]ModuleConfiguration, error) LoadAsyncCall(ctx context.Context, id int64) (AsyncCall, error) - // get a lock on the subscription row, guaranteeing that it is idle and has not consumed more events - LockSubscriptionAndGetSink(ctx context.Context, key model.SubscriptionKey, cursor optional.Option[model.TopicEventKey]) (LockSubscriptionAndGetSinkRow, error) NewLease(ctx context.Context, key leases.Key, ttl time.Duration) (uuid.UUID, error) PublishEventForTopic(ctx context.Context, arg PublishEventForTopicParams) error ReleaseLease(ctx context.Context, idempotencyKey uuid.UUID, key leases.Key) (bool, error) diff --git a/backend/controller/sql/queries.sql b/backend/controller/sql/queries.sql index 23e09b2c8f..542c223c79 100644 --- a/backend/controller/sql/queries.sql +++ b/backend/controller/sql/queries.sql @@ -678,16 +678,22 @@ VALUES ( ); -- name: GetSubscriptionsNeedingUpdate :many +-- Results may not be ready to be scheduled yet due to event consumption delay +-- Sorting ensures that brand new events (that may not be ready for consumption) +-- don't prevent older events from being consumed SELECT subs.key::subscription_key as key, - topic_events.key as cursor, + curser.key as cursor, topics.key::topic_key as topic, subs.name FROM topic_subscriptions subs LEFT JOIN topics ON subs.topic_id = topics.id -LEFT JOIN topic_events ON subs.cursor = topic_events.id +LEFT JOIN topic_events curser ON subs.cursor = curser.id WHERE subs.cursor IS DISTINCT FROM topics.head - AND subs.state = 'idle'; + AND subs.state = 'idle' +ORDER BY curser.created_at +LIMIT 3 +FOR UPDATE OF subs SKIP LOCKED; -- name: GetNextEventForSubscription :one WITH cursor AS ( @@ -698,7 +704,8 @@ WITH cursor AS ( WHERE "key" = sqlc.narg('cursor')::topic_event_key ) SELECT events."key" as event, - events.payload + events.payload, + events.created_at FROM topics LEFT JOIN topic_events as events ON events.topic_id = topics.id WHERE topics.key = sqlc.arg('topic')::topic_key @@ -706,28 +713,16 @@ WHERE topics.key = sqlc.arg('topic')::topic_key ORDER BY events.created_at, events.id LIMIT 1; --- name: LockSubscriptionAndGetSink :one -WITH subscriber AS ( - -- choose a random subscriber to execute the event - SELECT - subscribers.sink as sink - FROM topic_subscribers as subscribers - JOIN deployments ON subscribers.deployment_id = deployments.id - JOIN topic_subscriptions ON subscribers.topic_subscriptions_id = topic_subscriptions.id - WHERE topic_subscriptions.key = sqlc.arg('key')::subscription_key - AND deployments.min_replicas > 0 - ORDER BY RANDOM() - LIMIT 1 -) --- get a lock on the subscription row, guaranteeing that it is idle and has not consumed more events +-- name: GetRandomSubscriberSink :one SELECT - id as subscription_id, - (SELECT sink FROM subscriber) AS sink -FROM topic_subscriptions -WHERE state = 'idle' - AND key = sqlc.arg('key')::subscription_key - AND cursor IS NOT DISTINCT FROM (SELECT id FROM topic_events WHERE "key" = sqlc.narg('cursor')::topic_event_key) -FOR UPDATE; + subscribers.sink as sink +FROM topic_subscribers as subscribers +JOIN deployments ON subscribers.deployment_id = deployments.id +JOIN topic_subscriptions ON subscribers.topic_subscriptions_id = topic_subscriptions.id +WHERE topic_subscriptions.key = sqlc.arg('key')::subscription_key + AND deployments.min_replicas > 0 +ORDER BY RANDOM() +LIMIT 1; -- name: BeginConsumingTopicEvent :exec WITH event AS ( @@ -738,7 +733,7 @@ WITH event AS ( UPDATE topic_subscriptions SET state = 'executing', cursor = (SELECT id FROM event) -WHERE id = sqlc.arg('subscription_id'); +WHERE key = sqlc.arg('subscription')::subscription_key; -- name: CompleteEventForSubscription :exec WITH module AS ( diff --git a/backend/controller/sql/queries.sql.go b/backend/controller/sql/queries.sql.go index e67324423e..2c6e7cd65c 100644 --- a/backend/controller/sql/queries.sql.go +++ b/backend/controller/sql/queries.sql.go @@ -109,11 +109,11 @@ WITH event AS ( UPDATE topic_subscriptions SET state = 'executing', cursor = (SELECT id FROM event) -WHERE id = $1 +WHERE key = $1::subscription_key ` -func (q *Queries) BeginConsumingTopicEvent(ctx context.Context, subscriptionID optional.Option[int64], event optional.Option[model.TopicEventKey]) error { - _, err := q.db.Exec(ctx, beginConsumingTopicEvent, subscriptionID, event) +func (q *Queries) BeginConsumingTopicEvent(ctx context.Context, subscription model.SubscriptionKey, event model.TopicEventKey) error { + _, err := q.db.Exec(ctx, beginConsumingTopicEvent, subscription, event) return err } @@ -1192,7 +1192,8 @@ WITH cursor AS ( WHERE "key" = $2::topic_event_key ) SELECT events."key" as event, - events.payload + events.payload, + events.created_at FROM topics LEFT JOIN topic_events as events ON events.topic_id = topics.id WHERE topics.key = $1::topic_key @@ -1202,14 +1203,15 @@ LIMIT 1 ` type GetNextEventForSubscriptionRow struct { - Event optional.Option[model.TopicEventKey] - Payload []byte + Event optional.Option[model.TopicEventKey] + Payload []byte + CreatedAt optional.Option[time.Time] } func (q *Queries) GetNextEventForSubscription(ctx context.Context, topic model.TopicKey, cursor optional.Option[model.TopicEventKey]) (GetNextEventForSubscriptionRow, error) { row := q.db.QueryRow(ctx, getNextEventForSubscription, topic, cursor) var i GetNextEventForSubscriptionRow - err := row.Scan(&i.Event, &i.Payload) + err := row.Scan(&i.Event, &i.Payload, &i.CreatedAt) return i, err } @@ -1262,6 +1264,25 @@ func (q *Queries) GetProcessList(ctx context.Context) ([]GetProcessListRow, erro return items, nil } +const getRandomSubscriberSink = `-- name: GetRandomSubscriberSink :one +SELECT + subscribers.sink as sink +FROM topic_subscribers as subscribers +JOIN deployments ON subscribers.deployment_id = deployments.id +JOIN topic_subscriptions ON subscribers.topic_subscriptions_id = topic_subscriptions.id +WHERE topic_subscriptions.key = $1::subscription_key + AND deployments.min_replicas > 0 +ORDER BY RANDOM() +LIMIT 1 +` + +func (q *Queries) GetRandomSubscriberSink(ctx context.Context, key model.SubscriptionKey) (schema.RefKey, error) { + row := q.db.QueryRow(ctx, getRandomSubscriberSink, key) + var sink schema.RefKey + err := row.Scan(&sink) + return sink, err +} + const getRouteForRunner = `-- name: GetRouteForRunner :one SELECT endpoint, r.key AS runner_key, r.module_name, d.key deployment_key, r.state FROM runners r @@ -1502,14 +1523,17 @@ func (q *Queries) GetStaleCronJobs(ctx context.Context, dollar_1 time.Duration) const getSubscriptionsNeedingUpdate = `-- name: GetSubscriptionsNeedingUpdate :many SELECT subs.key::subscription_key as key, - topic_events.key as cursor, + curser.key as cursor, topics.key::topic_key as topic, subs.name FROM topic_subscriptions subs LEFT JOIN topics ON subs.topic_id = topics.id -LEFT JOIN topic_events ON subs.cursor = topic_events.id +LEFT JOIN topic_events curser ON subs.cursor = curser.id WHERE subs.cursor IS DISTINCT FROM topics.head AND subs.state = 'idle' +ORDER BY curser.created_at +LIMIT 3 +FOR UPDATE OF subs SKIP LOCKED ` type GetSubscriptionsNeedingUpdateRow struct { @@ -1519,6 +1543,9 @@ type GetSubscriptionsNeedingUpdateRow struct { Name string } +// Results may not be ready to be scheduled yet due to event consumption delay +// Sorting ensures that brand new events (that may not be ready for consumption) +// don't prevent older events from being consumed func (q *Queries) GetSubscriptionsNeedingUpdate(ctx context.Context) ([]GetSubscriptionsNeedingUpdateRow, error) { rows, err := q.db.Query(ctx, getSubscriptionsNeedingUpdate) if err != nil { @@ -1871,42 +1898,6 @@ func (q *Queries) LoadAsyncCall(ctx context.Context, id int64) (AsyncCall, error return i, err } -const lockSubscriptionAndGetSink = `-- name: LockSubscriptionAndGetSink :one -WITH subscriber AS ( - -- choose a random subscriber to execute the event - SELECT - subscribers.sink as sink - FROM topic_subscribers as subscribers - JOIN deployments ON subscribers.deployment_id = deployments.id - JOIN topic_subscriptions ON subscribers.topic_subscriptions_id = topic_subscriptions.id - WHERE topic_subscriptions.key = $1::subscription_key - AND deployments.min_replicas > 0 - ORDER BY RANDOM() - LIMIT 1 -) -SELECT - id as subscription_id, - (SELECT sink FROM subscriber) AS sink -FROM topic_subscriptions -WHERE state = 'idle' - AND key = $1::subscription_key - AND cursor IS NOT DISTINCT FROM (SELECT id FROM topic_events WHERE "key" = $2::topic_event_key) -FOR UPDATE -` - -type LockSubscriptionAndGetSinkRow struct { - SubscriptionID int64 - Sink schema.RefKey -} - -// get a lock on the subscription row, guaranteeing that it is idle and has not consumed more events -func (q *Queries) LockSubscriptionAndGetSink(ctx context.Context, key model.SubscriptionKey, cursor optional.Option[model.TopicEventKey]) (LockSubscriptionAndGetSinkRow, error) { - row := q.db.QueryRow(ctx, lockSubscriptionAndGetSink, key, cursor) - var i LockSubscriptionAndGetSinkRow - err := row.Scan(&i.SubscriptionID, &i.Sink) - return i, err -} - const newLease = `-- name: NewLease :one INSERT INTO leases (idempotency_key, key, expires_at) VALUES (gen_random_uuid(), $1::lease_key, (NOW() AT TIME ZONE 'utc') + $2::interval) diff --git a/backend/controller/sql/schema/001_init.sql b/backend/controller/sql/schema/001_init.sql index 4abbae0c5e..e5d701cae2 100644 --- a/backend/controller/sql/schema/001_init.sql +++ b/backend/controller/sql/schema/001_init.sql @@ -351,18 +351,25 @@ CREATE TRIGGER topic_events_notify_event EXECUTE PROCEDURE notify_event(); -- Automatically update module_name when deployment_id is set or unset. +-- Subquery ensures that two events added at the same time that the head is calculated properly CREATE OR REPLACE FUNCTION topics_update_head() RETURNS TRIGGER AS $$ BEGIN UPDATE topics - SET head = NEW.id + SET head = ( + SELECT id + FROM topic_events + WHERE topic_id = NEW.topic_id + ORDER BY created_at DESC, id DESC + LIMIT 1 + ) WHERE id = NEW.topic_id; RETURN NEW; END; $$ LANGUAGE plpgsql; CREATE TRIGGER topics_update_head - BEFORE INSERT OR UPDATE + AFTER INSERT OR UPDATE ON topic_events FOR EACH ROW EXECUTE PROCEDURE topics_update_head(); diff --git a/integration/actions.go b/integration/actions.go index ec9879c383..d080a2d232 100644 --- a/integration/actions.go +++ b/integration/actions.go @@ -78,6 +78,15 @@ func Chain(actions ...Action) Action { } } +// Repeat an action N times. +func Repeat(n int, action Action) Action { + return func(t testing.TB, ic TestContext) { + for i := 0; i < n; i++ { + action(t, ic) + } + } +} + // Chdir changes the test working directory to the subdirectory for the duration of the action. func Chdir(dir string, a Action) Action { return func(t testing.TB, ic TestContext) { @@ -288,7 +297,7 @@ func QueryRow(database string, query string, expected ...interface{}) Action { return func(t testing.TB, ic TestContext) { actual := GetRow(t, ic, database, query, len(expected)) for i, a := range actual { - assert.Equal(t, a, expected[i]) + assert.Equal(t, expected[i], a) } } }