Skip to content

Commit

Permalink
feat: pubsub manager (#1673)
Browse files Browse the repository at this point in the history
closes #1596 closes
#1642

How it works:
- pubsub managers poll for subscriptions that are behind the topic's
head
    - locks these subscriptions for the tx
    - skips subscriptions that are already locked
- Then finds the next event for each subscription
- If the event is newer than our artificial delay, we skip this
subscription on this attempt

Pubsub also speeds things up by:
- Polling once a second
- When it schedules an async call, notifies controller so we can attempt
to pick it up straight away
- this is wrapped in a mutex so that we don't accidentally get more and
more async calls running at once on a single controller
- When the async call is completed, pubsub is notified so that it can
immediately try progressing subscriptions again
  • Loading branch information
matt2e authored Jun 6, 2024
1 parent 48839a4 commit c9a8561
Show file tree
Hide file tree
Showing 11 changed files with 281 additions and 246 deletions.
34 changes: 32 additions & 2 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ type Service struct {
runnerScaling scaling.RunnerScaling

increaseReplicaFailures map[string]int
asyncCallsLock sync.Mutex
}

func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling.RunnerScaling) (*Service, error) {
Expand Down Expand Up @@ -231,9 +232,8 @@ func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling.
svc.cronJobs = cronSvc
svc.controllerListListeners = append(svc.controllerListListeners, cronSvc)

pubSub := pubsub.New(ctx, key, db)
pubSub := pubsub.New(ctx, db, svc.tasks, svc)
svc.pubSub = pubSub
svc.controllerListListeners = append(svc.controllerListListeners, pubSub)

go svc.syncSchema(ctx)

Expand Down Expand Up @@ -1198,7 +1198,22 @@ func (s *Service) reconcileRunners(ctx context.Context) (time.Duration, error) {
return time.Second, nil
}

// AsyncCallWasAdded is an optional notification that an async call was added by this controller
//
// It allows us to speed up execution of scheduled async calls rather than waiting for the next poll time.
func (s *Service) AsyncCallWasAdded(ctx context.Context) {
go func() {
if _, err := s.executeAsyncCalls(ctx); err != nil {
log.FromContext(ctx).Errorf(err, "failed to progress subscriptions")
}
}()
}

func (s *Service) executeAsyncCalls(ctx context.Context) (time.Duration, error) {
// There are multiple entry points into this function, but we want the controller to handle async calls one at a time.
s.asyncCallsLock.Lock()
defer s.asyncCallsLock.Unlock()

logger := log.FromContext(ctx)
logger.Tracef("Acquiring async call")

Expand Down Expand Up @@ -1238,6 +1253,7 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (time.Duration, error)
// Will retry, do not propagate failure yet.
return nil
}
// Allow for handling of completion based on origin
switch origin := call.Origin.(type) {
case dal.AsyncOriginFSM:
return s.onAsyncFSMCallCompletion(ctx, tx, origin, failed)
Expand All @@ -1252,6 +1268,20 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (time.Duration, error)
if err != nil {
return 0, fmt.Errorf("failed to complete async call: %w", err)
}
go func() {
// Post-commit notification based on origin
switch origin := call.Origin.(type) {
case dal.AsyncOriginFSM:
break

case dal.AsyncOriginPubSub:
s.pubSub.AsyncCallDidCommit(ctx, origin)

default:
break
}
}()

return 0, nil
}

Expand Down
76 changes: 49 additions & 27 deletions backend/controller/dal/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package dal
import (
"context"
"fmt"
"time"

"github.com/TBD54566975/ftl/backend/controller/sql"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/slices"
"github.com/alecthomas/types/optional"
)

func (d *DAL) PublishEventForTopic(ctx context.Context, module, topic string, payload []byte) error {
Expand Down Expand Up @@ -39,43 +40,64 @@ func (d *DAL) GetSubscriptionsNeedingUpdate(ctx context.Context) ([]model.Subscr
}), nil
}

func (d *DAL) ProgressSubscription(ctx context.Context, subscription model.Subscription) error {
func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay time.Duration) (count int, err error) {
tx, err := d.Begin(ctx)
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
return 0, fmt.Errorf("failed to begin transaction: %w", err)
}
defer tx.CommitOrRollback(ctx, &err)

nextCursor, err := tx.db.GetNextEventForSubscription(ctx, subscription.Topic, subscription.Cursor)
if err != nil {
return fmt.Errorf("failed to get next cursor: %w", translatePGError(err))
}
logger := log.FromContext(ctx)

result, err := tx.db.LockSubscriptionAndGetSink(ctx, subscription.Key, subscription.Cursor)
// get subscriptions needing update
// also gets a lock on the subscription, and skips any subscriptions locked by others
subs, err := tx.db.GetSubscriptionsNeedingUpdate(ctx)
if err != nil {
return fmt.Errorf("failed to get lock on subscription: %w", translatePGError(err))
return 0, fmt.Errorf("could not get subscriptions to progress: %w", translatePGError(err))
}

err = tx.db.BeginConsumingTopicEvent(ctx, optional.Some(result.SubscriptionID), nextCursor.Event)
if err != nil {
return fmt.Errorf("failed to progress subscription: %w", translatePGError(err))
}
successful := 0
for _, subscription := range subs {
nextCursor, err := tx.db.GetNextEventForSubscription(ctx, subscription.Topic, subscription.Cursor)
if err != nil {
return 0, fmt.Errorf("failed to get next cursor: %w", translatePGError(err))
}
nextCursorKey, ok := nextCursor.Event.Get()
if !ok {
return 0, fmt.Errorf("could not find event to progress subscription: %w", translatePGError(err))
}
if nextCreatedAt, ok := nextCursor.CreatedAt.Get(); ok && nextCreatedAt.Add(eventConsumptionDelay).After(time.Now()) {
logger.Tracef("Skipping subscription %s because event is too new", subscription.Key)
continue
}

origin := AsyncOriginPubSub{
Subscription: schema.RefKey{
Module: subscription.Key.Payload.Module,
Name: subscription.Key.Payload.Name,
},
}
_, err = tx.db.CreateAsyncCall(ctx, sql.CreateAsyncCallParams{
Verb: result.Sink,
Origin: origin.String(),
Request: nextCursor.Payload,
})
if err != nil {
return fmt.Errorf("failed to schedule async task for subscription: %w", translatePGError(err))
sink, err := tx.db.GetRandomSubscriberSink(ctx, subscription.Key)
if err != nil {
return 0, fmt.Errorf("failed to get lock on subscription: %w", translatePGError(err))
}

err = tx.db.BeginConsumingTopicEvent(ctx, subscription.Key, nextCursorKey)
if err != nil {
return 0, fmt.Errorf("failed to progress subscription: %w", translatePGError(err))
}

origin := AsyncOriginPubSub{
Subscription: schema.RefKey{
Module: subscription.Key.Payload.Module,
Name: subscription.Key.Payload.Name,
},
}
_, err = tx.db.CreateAsyncCall(ctx, sql.CreateAsyncCallParams{
Verb: sink,
Origin: origin.String(),
Request: nextCursor.Payload,
})
if err != nil {
return 0, fmt.Errorf("failed to schedule async task for subscription: %w", translatePGError(err))
}
successful++
}
return nil
return successful, nil
}

func (d *DAL) CompleteEventForSubscription(ctx context.Context, module, name string) error {
Expand Down
56 changes: 49 additions & 7 deletions backend/controller/pubsub/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,20 @@ import (
)

func TestPubSub(t *testing.T) {
calls := 20
events := calls * 10
in.Run(t, "",
in.CopyModule("publisher"),
in.CopyModule("subscriber"),
in.Deploy("publisher"),
in.Deploy("subscriber"),

// publish 2 events
in.Call("publisher", "publish", in.Obj{}, func(t testing.TB, resp in.Obj) {}),
in.Call("publisher", "publish", in.Obj{}, func(t testing.TB, resp in.Obj) {}),
// publish events
in.Repeat(calls, in.Call("publisher", "publishTen", in.Obj{}, func(t testing.TB, resp in.Obj) {})),

// TODO: speed this up when we have proper pubsub implementation
in.Sleep(time.Second*4*2),
in.Sleep(time.Second*4),

// check that there are 2 successful async calls
// check that there are the right amount of successful async calls
in.QueryRow("ftl",
fmt.Sprintf(`
SELECT COUNT(*)
Expand All @@ -35,6 +35,48 @@ func TestPubSub(t *testing.T) {
state = 'success'
AND origin = '%s'
`, dal.AsyncOriginPubSub{Subscription: schema.RefKey{Module: "subscriber", Name: "test_subscription"}}.String()),
2),
events),
)
}

func TestPubSubConsumptionDelay(t *testing.T) {
in.Run(t, "",
in.CopyModule("publisher"),
in.CopyModule("subscriber"),
in.Deploy("publisher"),
in.Deploy("subscriber"),

// publish events with a small delay between each
// pubsub should trigger its poll a few times during this period
// each time it should continue processing each event until it reaches one that is too new to process
func(t testing.TB, ic in.TestContext) {
for i := 0; i < 60; i++ {
in.Call("publisher", "publishOne", in.Obj{}, func(t testing.TB, resp in.Obj) {})(t, ic)
time.Sleep(time.Millisecond * 50)
}
},

in.Sleep(time.Second*2),

// Get all event created ats, and all async call created ats
// Compare each, make sure none are less than 0.2s of each other
in.QueryRow("ftl", `
WITH event_times AS (
SELECT created_at, ROW_NUMBER() OVER (ORDER BY created_at) AS row_num
FROM (
select * from topic_events order by created_at
)
),
async_call_times AS (
SELECT created_at, ROW_NUMBER() OVER (ORDER BY created_at) AS row_num
FROM (
select * from async_calls ac order by created_at
)
)
SELECT COUNT(*)
FROM event_times
JOIN async_call_times ON event_times.row_num = async_call_times.row_num
WHERE ABS(EXTRACT(EPOCH FROM (event_times.created_at - async_call_times.created_at))) < 0.2;
`, 0),
)
}
Loading

0 comments on commit c9a8561

Please sign in to comment.