Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: pubsub manager #1673

Merged
merged 7 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ type Service struct {
runnerScaling scaling.RunnerScaling

increaseReplicaFailures map[string]int
asyncCallsLock sync.Mutex
}

func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling.RunnerScaling) (*Service, error) {
Expand Down Expand Up @@ -231,9 +232,8 @@ func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling.
svc.cronJobs = cronSvc
svc.controllerListListeners = append(svc.controllerListListeners, cronSvc)

pubSub := pubsub.New(ctx, key, db)
pubSub := pubsub.New(ctx, db, svc.tasks, svc)
svc.pubSub = pubSub
svc.controllerListListeners = append(svc.controllerListListeners, pubSub)

go svc.syncSchema(ctx)

Expand Down Expand Up @@ -1198,7 +1198,22 @@ func (s *Service) reconcileRunners(ctx context.Context) (time.Duration, error) {
return time.Second, nil
}

// AsyncCallWasAdded is an optional notification that an async call was added by this controller
//
// It allows us to speed up execution of scheduled async calls rather than waiting for the next poll time.
func (s *Service) AsyncCallWasAdded(ctx context.Context) {
go func() {
if _, err := s.executeAsyncCalls(ctx); err != nil {
log.FromContext(ctx).Errorf(err, "failed to progress subscriptions")
}
}()
}

func (s *Service) executeAsyncCalls(ctx context.Context) (time.Duration, error) {
// There are multiple entry points into this function, but we want the controller to handle async calls one at a time.
s.asyncCallsLock.Lock()
defer s.asyncCallsLock.Unlock()

logger := log.FromContext(ctx)
logger.Tracef("Acquiring async call")

Expand Down Expand Up @@ -1238,6 +1253,7 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (time.Duration, error)
// Will retry, do not propagate failure yet.
return nil
}
// Allow for handling of completion based on origin
switch origin := call.Origin.(type) {
case dal.AsyncOriginFSM:
return s.onAsyncFSMCallCompletion(ctx, tx, origin, failed)
Expand All @@ -1252,6 +1268,20 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (time.Duration, error)
if err != nil {
return 0, fmt.Errorf("failed to complete async call: %w", err)
}
go func() {
// Post-commit notification based on origin
switch origin := call.Origin.(type) {
case dal.AsyncOriginFSM:
break

case dal.AsyncOriginPubSub:
s.pubSub.AsyncCallDidCommit(ctx, origin)

default:
break
}
}()

return 0, nil
}

Expand Down
76 changes: 49 additions & 27 deletions backend/controller/dal/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package dal
import (
"context"
"fmt"
"time"

"github.com/TBD54566975/ftl/backend/controller/sql"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/slices"
"github.com/alecthomas/types/optional"
)

func (d *DAL) PublishEventForTopic(ctx context.Context, module, topic string, payload []byte) error {
Expand Down Expand Up @@ -39,43 +40,64 @@ func (d *DAL) GetSubscriptionsNeedingUpdate(ctx context.Context) ([]model.Subscr
}), nil
}

func (d *DAL) ProgressSubscription(ctx context.Context, subscription model.Subscription) error {
func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay time.Duration) (count int, err error) {
tx, err := d.Begin(ctx)
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
return 0, fmt.Errorf("failed to begin transaction: %w", err)
}
defer tx.CommitOrRollback(ctx, &err)

nextCursor, err := tx.db.GetNextEventForSubscription(ctx, subscription.Topic, subscription.Cursor)
if err != nil {
return fmt.Errorf("failed to get next cursor: %w", translatePGError(err))
}
logger := log.FromContext(ctx)

result, err := tx.db.LockSubscriptionAndGetSink(ctx, subscription.Key, subscription.Cursor)
// get subscriptions needing update
// also gets a lock on the subscription, and skips any subscriptions locked by others
subs, err := tx.db.GetSubscriptionsNeedingUpdate(ctx)
if err != nil {
return fmt.Errorf("failed to get lock on subscription: %w", translatePGError(err))
return 0, fmt.Errorf("could not get subscriptions to progress: %w", translatePGError(err))
}

err = tx.db.BeginConsumingTopicEvent(ctx, optional.Some(result.SubscriptionID), nextCursor.Event)
if err != nil {
return fmt.Errorf("failed to progress subscription: %w", translatePGError(err))
}
successful := 0
for _, subscription := range subs {
nextCursor, err := tx.db.GetNextEventForSubscription(ctx, subscription.Topic, subscription.Cursor)
if err != nil {
return 0, fmt.Errorf("failed to get next cursor: %w", translatePGError(err))
}
nextCursorKey, ok := nextCursor.Event.Get()
if !ok {
return 0, fmt.Errorf("could not find event to progress subscription: %w", translatePGError(err))
}
if nextCreatedAt, ok := nextCursor.CreatedAt.Get(); ok && nextCreatedAt.Add(eventConsumptionDelay).After(time.Now()) {
logger.Tracef("Skipping subscription %s because event is too new", subscription.Key)
continue
}

origin := AsyncOriginPubSub{
Subscription: schema.RefKey{
Module: subscription.Key.Payload.Module,
Name: subscription.Key.Payload.Name,
},
}
_, err = tx.db.CreateAsyncCall(ctx, sql.CreateAsyncCallParams{
Verb: result.Sink,
Origin: origin.String(),
Request: nextCursor.Payload,
})
if err != nil {
return fmt.Errorf("failed to schedule async task for subscription: %w", translatePGError(err))
sink, err := tx.db.GetRandomSubscriberSink(ctx, subscription.Key)
if err != nil {
return 0, fmt.Errorf("failed to get lock on subscription: %w", translatePGError(err))
}

err = tx.db.BeginConsumingTopicEvent(ctx, subscription.Key, nextCursorKey)
if err != nil {
return 0, fmt.Errorf("failed to progress subscription: %w", translatePGError(err))
}

origin := AsyncOriginPubSub{
Subscription: schema.RefKey{
Module: subscription.Key.Payload.Module,
Name: subscription.Key.Payload.Name,
},
}
_, err = tx.db.CreateAsyncCall(ctx, sql.CreateAsyncCallParams{
Verb: sink,
Origin: origin.String(),
Request: nextCursor.Payload,
})
if err != nil {
return 0, fmt.Errorf("failed to schedule async task for subscription: %w", translatePGError(err))
}
successful++
}
return nil
return successful, nil
}

func (d *DAL) CompleteEventForSubscription(ctx context.Context, module, name string) error {
Expand Down
56 changes: 49 additions & 7 deletions backend/controller/pubsub/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,20 @@ import (
)

func TestPubSub(t *testing.T) {
calls := 20
events := calls * 10
in.Run(t, "",
in.CopyModule("publisher"),
in.CopyModule("subscriber"),
in.Deploy("publisher"),
in.Deploy("subscriber"),

// publish 2 events
in.Call("publisher", "publish", in.Obj{}, func(t testing.TB, resp in.Obj) {}),
in.Call("publisher", "publish", in.Obj{}, func(t testing.TB, resp in.Obj) {}),
// publish events
in.Repeat(calls, in.Call("publisher", "publishTen", in.Obj{}, func(t testing.TB, resp in.Obj) {})),

// TODO: speed this up when we have proper pubsub implementation
in.Sleep(time.Second*4*2),
in.Sleep(time.Second*4),

// check that there are 2 successful async calls
// check that there are the right amount of successful async calls
in.QueryRow("ftl",
fmt.Sprintf(`
SELECT COUNT(*)
Expand All @@ -35,6 +35,48 @@ func TestPubSub(t *testing.T) {
state = 'success'
AND origin = '%s'
`, dal.AsyncOriginPubSub{Subscription: schema.RefKey{Module: "subscriber", Name: "test_subscription"}}.String()),
2),
events),
)
}

func TestPubSubConsumptionDelay(t *testing.T) {
in.Run(t, "",
in.CopyModule("publisher"),
in.CopyModule("subscriber"),
in.Deploy("publisher"),
in.Deploy("subscriber"),

// publish events with a small delay between each
// pubsub should trigger its poll a few times during this period
// each time it should continue processing each event until it reaches one that is too new to process
func(t testing.TB, ic in.TestContext) {
for i := 0; i < 60; i++ {
in.Call("publisher", "publishOne", in.Obj{}, func(t testing.TB, resp in.Obj) {})(t, ic)
time.Sleep(time.Millisecond * 50)
}
},

in.Sleep(time.Second*2),

// Get all event created ats, and all async call created ats
// Compare each, make sure none are less than 0.2s of each other
in.QueryRow("ftl", `
WITH event_times AS (
SELECT created_at, ROW_NUMBER() OVER (ORDER BY created_at) AS row_num
FROM (
select * from topic_events order by created_at
)
),
async_call_times AS (
SELECT created_at, ROW_NUMBER() OVER (ORDER BY created_at) AS row_num
FROM (
select * from async_calls ac order by created_at
)
)
SELECT COUNT(*)
FROM event_times
JOIN async_call_times ON event_times.row_num = async_call_times.row_num
WHERE ABS(EXTRACT(EPOCH FROM (event_times.created_at - async_call_times.created_at))) < 0.2;
`, 0),
)
}
Loading
Loading