Skip to content

Commit

Permalink
fix: addresses event completion race condition in pubsub test utility (
Browse files Browse the repository at this point in the history
…#2332)

fixes: #2266

The race conditions was caused by two issues.

1. The WaitGroup down tick was not synchronized with the completion of
the subscriber verb execution. Failure to wait for completion may result
in a down time coming before a new event is dispatched (which ticks up
the waitgroup) - which may result in the WaitGroup reaching zero too
soon.
2. Non-linear pubsub networks are not supported by simply counting up
when an event is published and down after it is completed. The up count
needs to match the number of live subscriptions (e.g. subscriptions with
registered subscribers)

This synchronization scheme does not cover asynchronous event dispatch
(e.g. events dispatched via go routines within the subscriber verb) such
scenarios introduce a race condition that cannot be resolved by black
box external synchronization.
  • Loading branch information
jonathanj-square authored Aug 15, 2024
1 parent e339fb6 commit c6d889c
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 45 deletions.
71 changes: 43 additions & 28 deletions go-runtime/ftl/ftltest/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ import (
"github.com/TBD54566975/ftl/internal/slices"
)

type topicState struct {
// events published to this topic
events []any
// tracks the number of live subscriptions for this topic
subscriptionCount int
}

type fakePubSub struct {
// all pubsub events are processed through globalTopic
globalTopic *pubsub.Topic[pubSubEvent]
Expand All @@ -25,15 +32,15 @@ type fakePubSub struct {

// pubSubLock required to access [topics, subscriptions, subscribers]
pubSubLock sync.Mutex
topics map[schema.RefKey][]any
topics map[schema.RefKey]*topicState
subscriptions map[string]*subscription
subscribers map[string][]subscriber
}

func newFakePubSub(ctx context.Context) *fakePubSub {
f := &fakePubSub{
globalTopic: pubsub.New[pubSubEvent](),
topics: map[schema.RefKey][]any{},
topics: map[schema.RefKey]*topicState{},
subscriptions: map[string]*subscription{},
subscribers: map[string][]subscriber{},
}
Expand All @@ -42,10 +49,24 @@ func newFakePubSub(ctx context.Context) *fakePubSub {
}

func (f *fakePubSub) publishEvent(topic *schema.Ref, event any) error {
// tracks event publication to a topic
f.publishWaitGroup.Add(1)
// tracks event subscription consumption completion
f.publishWaitGroup.Add(f.fetchTopicState(topic).subscriptionCount)
return f.globalTopic.PublishSync(publishEvent{topic: topic, content: event})
}

func (f *fakePubSub) fetchTopicState(topic *schema.Ref) *topicState {
ts, ok := f.topics[topic.ToRefKey()]
if !ok {
ts = &topicState{
events: []any{},
}
f.topics[topic.ToRefKey()] = ts
}
return ts
}

// addSubscriber adds a subscriber to the fake FTL instance. Each subscriber included in the test must be manually added
func addSubscriber[E any](f *fakePubSub, sub ftl.SubscriptionHandle[E], sink ftl.Sink[E]) {
f.pubSubLock.Lock()
Expand All @@ -57,6 +78,7 @@ func addSubscriber[E any](f *fakePubSub, sub ftl.SubscriptionHandle[E], sink ftl
topic: sub.Topic,
errors: map[int]error{},
}
f.fetchTopicState(sub.Topic).subscriptionCount++
}

f.subscribers[sub.Name] = append(f.subscribers[sub.Name], func(ctx context.Context, event any) error {
Expand All @@ -77,11 +99,8 @@ func eventsForTopic[E any](ctx context.Context, f *fakePubSub, topic ftl.TopicHa

logger := log.FromContext(ctx).Scope("pubsub")
var events = []E{}
raw, ok := f.topics[topic.Ref.ToRefKey()]
if !ok {
return events
}
for _, e := range raw {
ts := f.fetchTopicState(topic.Ref)
for _, e := range ts.events {
if event, ok := e.(E); ok {
events = append(events, event)
} else {
Expand All @@ -103,17 +122,13 @@ func resultsForSubscription[E any](ctx context.Context, f *fakePubSub, handle ft
if !ok {
return results
}
topic, ok := f.topics[handle.Topic.ToRefKey()]
if !ok {
return results
}

ts := f.fetchTopicState(subscription.topic)
count := subscription.cursor.Default(-1)
if !subscription.isExecuting {
count++
}
for i := range count {
e := topic[i]
e := ts.events[i]
if event, ok := e.(E); ok {
result := SubscriptionResult[E]{
Event: event,
Expand Down Expand Up @@ -155,11 +170,9 @@ func (f *fakePubSub) handlePubSubEvent(ctx context.Context, e pubSubEvent) {
switch event := e.(type) {
case publishEvent:
logger.Debugf("publishing to %s: %v", event.topic.Name, event.content)
if _, ok := f.topics[event.topic.ToRefKey()]; !ok {
f.topics[event.topic.ToRefKey()] = []any{event.content}
} else {
f.topics[event.topic.ToRefKey()] = append(f.topics[event.topic.ToRefKey()], event.content)
}
ts := f.fetchTopicState(event.topic)
ts.events = append(ts.events, event.content)
// indicate that the event has been published to the topic
f.publishWaitGroup.Done()
case subscriptionDidConsumeEvent:
sub, ok := f.subscriptions[event.subscription]
Expand All @@ -170,20 +183,22 @@ func (f *fakePubSub) handlePubSubEvent(ctx context.Context, e pubSubEvent) {
sub.errors[sub.cursor.MustGet()] = event.err
}
sub.isExecuting = false
// indicate that the subscription has processed the event
f.publishWaitGroup.Done()
}

for _, sub := range f.subscriptions {
if sub.isExecuting {
// already executing
continue
}
topicEvents, ok := f.topics[sub.topic.ToRefKey()]
if !ok {
// no events publshed yet
ts := f.fetchTopicState(sub.topic)
if len(ts.events) == 0 {
// no events published yet
continue
}
var cursor = sub.cursor.Default(-1)
if len(topicEvents) <= cursor+1 {
if len(ts.events) <= cursor+1 {
// no new events
continue
}
Expand All @@ -200,7 +215,7 @@ func (f *fakePubSub) handlePubSubEvent(ctx context.Context, e pubSubEvent) {
go func(sub string, chosenSubscriber subscriber, event any) {
err := chosenSubscriber(ctx, event)
f.globalTopic.Publish(subscriptionDidConsumeEvent{subscription: sub, err: err})
}(sub.name, chosenSubscriber, topicEvents[sub.cursor.MustGet()])
}(sub.name, chosenSubscriber, ts.events[sub.cursor.MustGet()])
}
}

Expand Down Expand Up @@ -237,13 +252,13 @@ func (f *fakePubSub) checkSubscriptionsAreComplete(ctx context.Context, shouldPr
}
remaining := []remainingState{}
for _, sub := range f.subscriptions {
topicEvents, ok := f.topics[sub.topic.ToRefKey()]
if !ok {
// no events publshed yet
ts := f.fetchTopicState(sub.topic)
if len(ts.events) == 0 {
// no events published yet
continue
}
var cursor = sub.cursor.Default(-1)
if !sub.isExecuting && len(topicEvents) <= cursor+1 {
if !sub.isExecuting && len(ts.events) <= cursor+1 {
// all events have been consumed
continue
}
Expand All @@ -255,7 +270,7 @@ func (f *fakePubSub) checkSubscriptionsAreComplete(ctx context.Context, shouldPr
remaining = append(remaining, remainingState{
name: sub.name,
isExecuting: sub.isExecuting,
pendingEvents: len(topicEvents) - cursor - 1,
pendingEvents: len(ts.events) - cursor - 1,
})
}
if len(remaining) == 0 {
Expand Down
25 changes: 21 additions & 4 deletions go-runtime/ftl/ftltest/testdata/go/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,15 @@ import (
)

//ftl:export
var Topic = ftl.Topic[Event]("topic")
var subscription = ftl.Subscription(Topic, "subscription")
var Topic1 = ftl.Topic[Event]("topic1")

//ftl:export
var Topic2 = ftl.Topic[Event]("topic2")

var subscription1_1 = ftl.Subscription(Topic1, "subscription_1_1")
var subscription1_2 = ftl.Subscription(Topic1, "subscription_1_2")
var subscription2_1 = ftl.Subscription(Topic2, "subscription_2_1")
var subscription2_2 = ftl.Subscription(Topic2, "subscription_2_3")

//ftl:data
type Event struct {
Expand All @@ -20,10 +27,20 @@ type Event struct {

//ftl:verb
func PublishToTopicOne(ctx context.Context, event Event) error {
return Topic.Publish(ctx, event)
return Topic1.Publish(ctx, event)
}

//ftl:verb
func PropagateToTopic2(ctx context.Context, event Event) error {
return Topic2.Publish(ctx, event)
}

//ftl:verb
func ConsumeEvent(_ context.Context, _ Event) error {
return nil
}

//ftl:subscribe subscription
//ftl:subscribe subscription_1_1
func ErrorsAfterASecond(ctx context.Context, event Event) error {
time.Sleep(1 * time.Second)
return fmt.Errorf("SubscriberThatFails always fails")
Expand Down
57 changes: 50 additions & 7 deletions go-runtime/ftl/ftltest/testdata/go/pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,58 @@ import (
func TestSubscriberReturningErrors(t *testing.T) {
// Test that we can publish multiple events, which will take time to consume, and that we track each error
ctx := ftltest.Context(
ftltest.WithSubscriber(subscription, ErrorsAfterASecond),
ftltest.WithSubscriber(subscription1_1, ErrorsAfterASecond),
)
count := 5
for i := 0; i < count; i++ {
assert.NoError(t, PublishToTopicOne(ctx, Event{Value: strconv.Itoa(i)}))
}
ftltest.WaitForSubscriptionsToComplete(ctx)
assert.Equal(t, count, len(ftltest.ErrorsForSubscription(ctx, subscription)))
assert.Equal(t, count, len(ftltest.EventsForTopic(ctx, Topic)))
assert.Equal(t, count, len(ftltest.ErrorsForSubscription(ctx, subscription1_1)))
assert.Equal(t, count, len(ftltest.EventsForTopic(ctx, Topic1)))
}

// establishes a pubsub network that forwards from topic 1 to topic 2 on a single subscription
// and does NOT register any subscribers against topic 2's subscription
func TestForwardedEvent(t *testing.T) {
// Test that we can publish multiple events, which will take time to consume, and that we track each error
ctx := ftltest.Context(
ftltest.WithSubscriber(subscription1_1, PropagateToTopic2),
)
assert.NoError(t, PublishToTopicOne(ctx, Event{Value: "propagation-test"}))
ftltest.WaitForSubscriptionsToComplete(ctx)
assert.Equal(t, 1, len(ftltest.EventsForTopic(ctx, Topic1)))
assert.Equal(t, 1, len(ftltest.EventsForTopic(ctx, Topic2)))
}

// establishes a pubsub network that forwards from topic 1 to topic 2 on two subscriptions
// and does NOT register any subscribers against topic 2's subscriptions
func TestPropagatedEvent(t *testing.T) {
// Test that we can publish multiple events, which will take time to consume, and that we track each error
ctx := ftltest.Context(
ftltest.WithSubscriber(subscription1_1, PropagateToTopic2),
ftltest.WithSubscriber(subscription1_2, PropagateToTopic2),
)
assert.NoError(t, PublishToTopicOne(ctx, Event{Value: "propagation-test"}))
ftltest.WaitForSubscriptionsToComplete(ctx)
assert.Equal(t, 1, len(ftltest.EventsForTopic(ctx, Topic1)))
assert.Equal(t, 2, len(ftltest.EventsForTopic(ctx, Topic2)))
}

// establishes a pubsub network that forwards from topic 1 to topic 2 on two subscriptions
// and consumes from topic 2 via two subscriptions
func TestPropagationNetwork(t *testing.T) {
// Test that we can publish multiple events, which will take time to consume, and that we track each error
ctx := ftltest.Context(
ftltest.WithSubscriber(subscription1_1, PropagateToTopic2),
ftltest.WithSubscriber(subscription1_2, PropagateToTopic2),
ftltest.WithSubscriber(subscription2_1, ConsumeEvent),
ftltest.WithSubscriber(subscription2_2, ConsumeEvent),
)
assert.NoError(t, PublishToTopicOne(ctx, Event{Value: "propagation-test"}))
ftltest.WaitForSubscriptionsToComplete(ctx)
assert.Equal(t, 1, len(ftltest.EventsForTopic(ctx, Topic1)))
assert.Equal(t, 2, len(ftltest.EventsForTopic(ctx, Topic2)))
}

func TestMultipleMultipleFakeSubscribers(t *testing.T) {
Expand All @@ -32,13 +75,13 @@ func TestMultipleMultipleFakeSubscribers(t *testing.T) {
var counter atomic.Value[int]

ctx := ftltest.Context(
ftltest.WithSubscriber(subscription, func(ctx context.Context, event Event) error {
ftltest.WithSubscriber(subscription1_1, func(ctx context.Context, event Event) error {
ftl.LoggerFromContext(ctx).Infof("Fake Subscriber A")
current := counter.Load()
counter.Store(current + 1)
return nil
}),
ftltest.WithSubscriber(subscription, func(ctx context.Context, event Event) error {
ftltest.WithSubscriber(subscription1_1, func(ctx context.Context, event Event) error {
ftl.LoggerFromContext(ctx).Infof("Fake Subscriber B")
current := counter.Load()
counter.Store(current + 1)
Expand All @@ -49,7 +92,7 @@ func TestMultipleMultipleFakeSubscribers(t *testing.T) {
assert.NoError(t, PublishToTopicOne(ctx, Event{Value: strconv.Itoa(i)}))
}
ftltest.WaitForSubscriptionsToComplete(ctx)
assert.Equal(t, 0, len(ftltest.ErrorsForSubscription(ctx, subscription)))
assert.Equal(t, count, len(ftltest.EventsForTopic(ctx, Topic)))
assert.Equal(t, 0, len(ftltest.ErrorsForSubscription(ctx, subscription1_1)))
assert.Equal(t, count, len(ftltest.EventsForTopic(ctx, Topic1)))
assert.Equal(t, count, counter.Load())
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ import (
"github.com/TBD54566975/ftl/go-runtime/ftl" // Import the FTL SDK.
)

var _ = ftl.Subscription(pubsub.Topic, "subscription")
var _ = ftl.Subscription(pubsub.Topic1, "subscription1_1")
13 changes: 8 additions & 5 deletions go-runtime/ftl/ftltest/testdata/go/subscriber/subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@ package subscriber_test

import (
"ftl/pubsub"
"testing"

"github.com/TBD54566975/ftl/go-runtime/ftl/ftltest"
"github.com/alecthomas/assert/v2"
"testing"
)

func TestPublishToExternalModule(t *testing.T) {
ctx := ftltest.Context()
assert.NoError(t, pubsub.Topic.Publish(ctx, pubsub.Event{Value: "external"}))
assert.Equal(t, 1, len(ftltest.EventsForTopic(ctx, pubsub.Topic)))

assert.NoError(t, pubsub.Topic1.Publish(ctx, pubsub.Event{Value: "external"}))

ftltest.WaitForSubscriptionsToComplete(ctx)

assert.Equal(t, 1, len(ftltest.EventsForTopic(ctx, pubsub.Topic1)))

// Make sure we correctly made the right ref for the external module.
assert.Equal(t, "pubsub", pubsub.Topic.Ref.Module)
assert.Equal(t, "pubsub", pubsub.Topic1.Ref.Module)
}

0 comments on commit c6d889c

Please sign in to comment.