Skip to content

Commit

Permalink
add safety checks for multiple events added at once
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e committed Jun 5, 2024
1 parent 9c0e77f commit 6e64d92
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 14 deletions.
14 changes: 12 additions & 2 deletions backend/controller/dal/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package dal
import (
"context"
"fmt"
"time"

"github.com/TBD54566975/ftl/backend/controller/sql"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/slices"
)
Expand Down Expand Up @@ -38,20 +40,23 @@ func (d *DAL) GetSubscriptionsNeedingUpdate(ctx context.Context) ([]model.Subscr
}), nil
}

func (d *DAL) ProgressSubscriptions(ctx context.Context) (count int, err error) {
func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay time.Duration) (count int, err error) {
tx, err := d.Begin(ctx)
if err != nil {
return 0, fmt.Errorf("failed to begin transaction: %w", err)
}
defer tx.CommitOrRollback(ctx, &err)

logger := log.FromContext(ctx)

// get subscriptions needing update
// also gets a lock on the subscription, and skips any subscriptions locked by others
subs, err := tx.db.GetSubscriptionsNeedingUpdate(ctx)
if err != nil {
return 0, fmt.Errorf("could not get subscriptions to progress: %w", translatePGError(err))
}

successful := 0
for _, subscription := range subs {
nextCursor, err := tx.db.GetNextEventForSubscription(ctx, subscription.Topic, subscription.Cursor)
if err != nil {
Expand All @@ -61,6 +66,10 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context) (count int, err error)
if !ok {
return 0, fmt.Errorf("could not find event to progress subscription: %w", translatePGError(err))
}
if nextCreatedAt, ok := nextCursor.CreatedAt.Get(); ok && nextCreatedAt.Add(eventConsumptionDelay).After(time.Now()) {
logger.Tracef("Skipping subscription %s because event is too new", subscription.Key)
continue
}

sink, err := tx.db.GetRandomSubscriberSink(ctx, subscription.Key)
if err != nil {
Expand All @@ -86,8 +95,9 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context) (count int, err error)
if err != nil {
return 0, fmt.Errorf("failed to schedule async task for subscription: %w", translatePGError(err))
}
successful++
}
return len(subs), nil
return successful, nil
}

func (d *DAL) CompleteEventForSubscription(ctx context.Context, module, name string) error {
Expand Down
11 changes: 9 additions & 2 deletions backend/controller/pubsub/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,15 @@ import (
"github.com/jpillora/backoff"
)

const (
// Events can be added simultaneously, which can cause events with out of order create_at values
// By adding a delay, we ensure that by the time we read the events, no new events will be added
// with earlier created_at values.
eventConsumptionDelay = 500 * time.Millisecond
)

type DAL interface {
ProgressSubscriptions(ctx context.Context) (count int, err error)
ProgressSubscriptions(ctx context.Context, eventConsumptionDelay time.Duration) (count int, err error)
CompleteEventForSubscription(ctx context.Context, module, name string) error
}

Expand Down Expand Up @@ -46,7 +53,7 @@ func New(ctx context.Context, dal *dal.DAL, scheduler Scheduler, asyncCallListen
}

func (m *Manager) progressSubscriptions(ctx context.Context) (time.Duration, error) {
count, err := m.dal.ProgressSubscriptions(ctx)
count, err := m.dal.ProgressSubscriptions(ctx, eventConsumptionDelay)
if err != nil {
return 0, err
}
Expand Down
3 changes: 3 additions & 0 deletions backend/controller/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 8 additions & 3 deletions backend/controller/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -678,16 +678,20 @@ VALUES (
);

-- name: GetSubscriptionsNeedingUpdate :many
-- Results may not be ready to be scheduled yet due to event consumption delay
-- Sorting ensures that brand new events (that may not be ready for consumption)
-- don't prevent older events from being consumed
SELECT
subs.key::subscription_key as key,
topic_events.key as cursor,
curser.key as cursor,
topics.key::topic_key as topic,
subs.name
FROM topic_subscriptions subs
LEFT JOIN topics ON subs.topic_id = topics.id
LEFT JOIN topic_events ON subs.cursor = topic_events.id
LEFT JOIN topic_events curser ON subs.cursor = curser.id
WHERE subs.cursor IS DISTINCT FROM topics.head
AND subs.state = 'idle'
ORDER BY curser.created_at
LIMIT 3
FOR UPDATE OF subs SKIP LOCKED;

Expand All @@ -700,7 +704,8 @@ WITH cursor AS (
WHERE "key" = sqlc.narg('cursor')::topic_event_key
)
SELECT events."key" as event,
events.payload
events.payload,
events.created_at
FROM topics
LEFT JOIN topic_events as events ON events.topic_id = topics.id
WHERE topics.key = sqlc.arg('topic')::topic_key
Expand Down
18 changes: 12 additions & 6 deletions backend/controller/sql/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion backend/controller/sql/schema/001_init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -351,11 +351,18 @@ CREATE TRIGGER topic_events_notify_event
EXECUTE PROCEDURE notify_event();

-- Automatically update module_name when deployment_id is set or unset.
-- Subquery ensures that two events added at the same time that the head is calculated properly
CREATE OR REPLACE FUNCTION topics_update_head() RETURNS TRIGGER AS
$$
BEGIN
UPDATE topics
SET head = NEW.id
SET head = (
SELECT id
FROM topic_events
WHERE topic_id = NEW.topic_id
ORDER BY created_at DESC, id DESC
LIMIT 1
)
WHERE id = NEW.topic_id;
RETURN NEW;
END;
Expand Down

0 comments on commit 6e64d92

Please sign in to comment.