Skip to content

Commit

Permalink
feat: pubsub db changes (#1641)
Browse files Browse the repository at this point in the history
- Added db queries for subscriptions to consume events by scheduling
async tasks
- When the async task completes, the subscription state is then ready to
consume the next event
- Multiple controllers can attempt to progress the same subscriptions
but only one will succeed
- Demo implementation of a pubsub manager included which just polls for
subscriptions to attempt every 3s

Known issue:
#1642
Delaying that work so we can unblock others and get the basics working
first.

closes #1595
  • Loading branch information
matt2e authored Jun 5, 2024
1 parent c62b780 commit 306800e
Show file tree
Hide file tree
Showing 24 changed files with 1,090 additions and 19 deletions.
9 changes: 9 additions & 0 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/controller/ingress"
"github.com/TBD54566975/ftl/backend/controller/leases"
"github.com/TBD54566975/ftl/backend/controller/pubsub"
"github.com/TBD54566975/ftl/backend/controller/scaling"
"github.com/TBD54566975/ftl/backend/controller/scaling/localscaling"
"github.com/TBD54566975/ftl/backend/controller/scheduledtask"
Expand Down Expand Up @@ -182,6 +183,7 @@ type Service struct {

tasks *scheduledtask.Scheduler
cronJobs *cronjobs.Service
pubSub *pubsub.Manager
controllerListListeners []ControllerListListener

// Map from endpoint to client.
Expand Down Expand Up @@ -228,6 +230,10 @@ func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling.
svc.cronJobs = cronSvc
svc.controllerListListeners = append(svc.controllerListListeners, cronSvc)

pubSub := pubsub.New(ctx, key, db)
svc.pubSub = pubSub
svc.controllerListListeners = append(svc.controllerListListeners, pubSub)

go svc.syncSchema(ctx)

// Use min, max backoff if we are running in production, otherwise use
Expand Down Expand Up @@ -1233,6 +1239,9 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (time.Duration, error)
case dal.AsyncOriginFSM:
return s.onAsyncFSMCallCompletion(ctx, tx, origin, failed)

case dal.AsyncOriginPubSub:
return s.pubSub.OnCallCompletion(ctx, tx, origin, failed)

default:
panic(fmt.Errorf("unsupported async call origin: %v", call.Origin))
}
Expand Down
15 changes: 14 additions & 1 deletion backend/controller/dal/async_calls.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type asyncOriginParseRoot struct {
}

var asyncOriginParser = participle.MustBuild[asyncOriginParseRoot](
participle.Union[AsyncOrigin](AsyncOriginFSM{}),
participle.Union[AsyncOrigin](AsyncOriginFSM{}, AsyncOriginPubSub{}),
)

// AsyncOrigin is a sum type representing the originator of an async call.
Expand All @@ -46,6 +46,19 @@ func (AsyncOriginFSM) asyncOrigin() {}
func (a AsyncOriginFSM) Origin() string { return "fsm" }
func (a AsyncOriginFSM) String() string { return fmt.Sprintf("fsm:%s:%s", a.FSM, a.Key) }

// AsyncOriginPubSub represents the context for the originator of an PubSub async call.
//
// It is in the form fsm:<module>.<subscription_name>
type AsyncOriginPubSub struct {
Subscription schema.RefKey `parser:"'sub' ':' @@"`
}

var _ AsyncOrigin = AsyncOriginPubSub{}

func (AsyncOriginPubSub) asyncOrigin() {}
func (a AsyncOriginPubSub) Origin() string { return "sub" }
func (a AsyncOriginPubSub) String() string { return fmt.Sprintf("sub:%s", a.Subscription) }

// ParseAsyncOrigin parses an async origin key.
func ParseAsyncOrigin(origin string) (AsyncOrigin, error) {
root, err := asyncOriginParser.ParseString("", origin)
Expand Down
6 changes: 5 additions & 1 deletion backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,12 +592,16 @@ func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchem
if !ok {
continue
}
sinkRef := schema.RefKey{
Module: moduleSchema.Name,
Name: v.Name,
}
err := tx.InsertSubscriber(ctx, sql.InsertSubscriberParams{
Key: model.NewSubscriberKey(moduleSchema.Name, s.Name, v.Name),
Module: moduleSchema.Name,
SubscriptionName: s.Name,
Deployment: deploymentKey,
Sink: v.Name,
Sink: sinkRef,
})
if err != nil {
return model.DeploymentKey{}, fmt.Errorf("could not insert subscriber: %w", translatePGError(err))
Expand Down
1 change: 1 addition & 0 deletions backend/controller/dal/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func TestLease(t *testing.T) {
dal, err := New(ctx, conn)
assert.NoError(t, err)

// TTL is too short, expect an error
_, err = dal.AcquireLease(ctx, leases.SystemKey("test"), time.Second*1)
assert.Error(t, err)

Expand Down
66 changes: 66 additions & 0 deletions backend/controller/dal/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ package dal

import (
"context"
"fmt"

"github.com/TBD54566975/ftl/backend/controller/sql"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/slices"
"github.com/alecthomas/types/optional"
)

func (d *DAL) PublishEventForTopic(ctx context.Context, module, topic string, payload []byte) error {
Expand All @@ -19,3 +23,65 @@ func (d *DAL) PublishEventForTopic(ctx context.Context, module, topic string, pa
}
return nil
}

func (d *DAL) GetSubscriptionsNeedingUpdate(ctx context.Context) ([]model.Subscription, error) {
rows, err := d.db.GetSubscriptionsNeedingUpdate(ctx)
if err != nil {
return nil, translatePGError(err)
}
return slices.Map(rows, func(row sql.GetSubscriptionsNeedingUpdateRow) model.Subscription {
return model.Subscription{
Name: row.Name,
Key: row.Key,
Topic: row.Topic,
Cursor: row.Cursor,
}
}), nil
}

func (d *DAL) ProgressSubscription(ctx context.Context, subscription model.Subscription) error {
tx, err := d.Begin(ctx)
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}
defer tx.CommitOrRollback(ctx, &err)

nextCursor, err := tx.db.GetNextEventForSubscription(ctx, subscription.Topic, subscription.Cursor)
if err != nil {
return fmt.Errorf("failed to get next cursor: %w", translatePGError(err))
}

result, err := tx.db.LockSubscriptionAndGetSink(ctx, subscription.Key, subscription.Cursor)
if err != nil {
return fmt.Errorf("failed to get lock on subscription: %w", translatePGError(err))
}

err = tx.db.BeginConsumingTopicEvent(ctx, optional.Some(result.SubscriptionID), nextCursor.Event)
if err != nil {
return fmt.Errorf("failed to progress subscription: %w", translatePGError(err))
}

origin := AsyncOriginPubSub{
Subscription: schema.RefKey{
Module: subscription.Key.Payload.Module,
Name: subscription.Key.Payload.Name,
},
}
_, err = tx.db.CreateAsyncCall(ctx, sql.CreateAsyncCallParams{
Verb: result.Sink,
Origin: origin.String(),
Request: nextCursor.Payload,
})
if err != nil {
return fmt.Errorf("failed to schedule async task for subscription: %w", translatePGError(err))
}
return nil
}

func (d *DAL) CompleteEventForSubscription(ctx context.Context, module, name string) error {
err := d.db.CompleteEventForSubscription(ctx, name, module)
if err != nil {
return fmt.Errorf("failed to complete event for subscription: %w", translatePGError(err))
}
return nil
}
40 changes: 40 additions & 0 deletions backend/controller/pubsub/integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
//go:build integration

package pubsub

import (
"fmt"
"testing"
"time"

"github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/schema"
in "github.com/TBD54566975/ftl/integration"
)

func TestPubSub(t *testing.T) {
in.Run(t, "",
in.CopyModule("publisher"),
in.CopyModule("subscriber"),
in.Deploy("publisher"),
in.Deploy("subscriber"),

// publish 2 events
in.Call("publisher", "publish", in.Obj{}, func(t testing.TB, resp in.Obj) {}),
in.Call("publisher", "publish", in.Obj{}, func(t testing.TB, resp in.Obj) {}),

// TODO: speed this up when we have proper pubsub implementation
in.Sleep(time.Second*4*2),

// check that there are 2 successful async calls
in.QueryRow("ftl",
fmt.Sprintf(`
SELECT COUNT(*)
FROM async_calls
WHERE
state = 'success'
AND origin = '%s'
`, dal.AsyncOriginPubSub{Subscription: schema.RefKey{Module: "subscriber", Name: "test_subscription"}}.String()),
2),
)
}
162 changes: 162 additions & 0 deletions backend/controller/pubsub/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package pubsub

import (
"context"
"fmt"
"time"

"github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/slices"
"github.com/alecthomas/atomic"
"github.com/serialx/hashring"
)

const (
controllersPerSubscription = 2
)

type DAL interface {
GetSubscriptionsNeedingUpdate(ctx context.Context) ([]model.Subscription, error)
ProgressSubscription(ctx context.Context, subscription model.Subscription) error
CompleteEventForSubscription(ctx context.Context, module, name string) error
}

type Manager struct {
key model.ControllerKey
dal DAL
hashRingState atomic.Value[*hashRingState]
}

type hashRingState struct {
hashRing *hashring.HashRing
controllers []dal.Controller
idx int
}

func New(ctx context.Context, key model.ControllerKey, dal *dal.DAL) *Manager {
m := &Manager{
key: key,
dal: dal,
}

go m.watchForUpdates(ctx)
return m
}

func (m *Manager) HandleTopicNotification() {

}

func (m *Manager) HandleEventNotification() {

}

// UpdatedControllerList synchronises the hash ring with the active controllers.
func (m *Manager) UpdatedControllerList(ctx context.Context, controllers []dal.Controller) {
logger := log.FromContext(ctx).Scope("cron")
controllerIdx := -1
for idx, controller := range controllers {
if controller.Key.String() == m.key.String() {
controllerIdx = idx
break
}
}
if controllerIdx == -1 {
logger.Tracef("controller %q not found in list of controllers", m.key)
}

oldState := m.hashRingState.Load()
if oldState != nil && len(oldState.controllers) == len(controllers) {
hasChanged := false
for idx, new := range controllers {
old := oldState.controllers[idx]
if new.Key.String() != old.Key.String() {
hasChanged = true
break
}
}
if !hasChanged {
return
}
}

hashRing := hashring.New(slices.Map(controllers, func(c dal.Controller) string { return c.Key.String() }))
m.hashRingState.Store(&hashRingState{
hashRing: hashRing,
controllers: controllers,
idx: controllerIdx,
})
}

// isResponsibleForSubscription indicates whether a this service should be responsible for attempting jobs,
// or if enough other controllers will handle it. This allows us to spread the job load across controllers.
func (m *Manager) isResponsibleForSubscription(subscription model.Subscription) bool {
hashringState := m.hashRingState.Load()
if hashringState == nil {
return true
}

initialKey, ok := hashringState.hashRing.GetNode(subscription.Key.String())
if !ok {
return true
}

initialIdx := -1
for idx, controller := range hashringState.controllers {
if controller.Key.String() == initialKey {
initialIdx = idx
break
}
}
if initialIdx == -1 {
return true
}

if initialIdx+controllersPerSubscription > len(hashringState.controllers) {
// wraps around
return hashringState.idx >= initialIdx || hashringState.idx < (initialIdx+controllersPerSubscription)-len(hashringState.controllers)
}
return hashringState.idx >= initialIdx && hashringState.idx < initialIdx+controllersPerSubscription
}

func (m *Manager) watchForUpdates(ctx context.Context) {
logger := log.FromContext(ctx).Scope("pubsub")

// TODO: handle events here. Currently a demo implementation
for {
select {
case <-ctx.Done():
return
case <-time.After(time.Second * 3):
if err := m.progressSubscriptions(ctx); err != nil {
logger.Errorf(err, "failed to progress subscriptions")
continue
}
}
}
}

func (m *Manager) progressSubscriptions(ctx context.Context) (err error) {
subscriptions, err := m.dal.GetSubscriptionsNeedingUpdate(ctx)
if err != nil {
return fmt.Errorf("failed to get subscriptions needing update: %w", err)
}
for _, subscription := range subscriptions {
if !m.isResponsibleForSubscription(subscription) {
continue
}
logger := log.FromContext(ctx)

err := m.dal.ProgressSubscription(ctx, subscription)
if err != nil {
logger.Errorf(err, "failed to progress subscription")
}
}
return nil
}

func (m *Manager) OnCallCompletion(ctx context.Context, tx *dal.Tx, origin dal.AsyncOriginPubSub, failed bool) error {
return m.dal.CompleteEventForSubscription(ctx, origin.Subscription.Module, origin.Subscription.Name)
}
2 changes: 2 additions & 0 deletions backend/controller/pubsub/testdata/go/publisher/ftl.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
module = "publisher"
language = "go"
Loading

0 comments on commit 306800e

Please sign in to comment.