Skip to content

Commit

Permalink
add tests, improve logging
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e committed Jun 6, 2024
1 parent 5b15fbc commit b36daf8
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 19 deletions.
3 changes: 2 additions & 1 deletion backend/controller/dal/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t

subscriber, err := tx.db.GetRandomSubscriber(ctx, subscription.Key)
if err != nil {
return 0, fmt.Errorf("failed to get lock on subscription: %w", translatePGError(err))
logger.Tracef("no subscriber for subscription %s", subscription.Key)
continue
}

err = tx.db.BeginConsumingTopicEvent(ctx, subscription.Key, nextCursorKey)
Expand Down
32 changes: 29 additions & 3 deletions backend/controller/pubsub/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestPubSub(t *testing.T) {
)
}

func TestPubSubConsumptionDelay(t *testing.T) {
func TestConsumptionDelay(t *testing.T) {
in.Run(t, "",
in.CopyModule("publisher"),
in.CopyModule("subscriber"),
Expand All @@ -50,9 +50,9 @@ func TestPubSubConsumptionDelay(t *testing.T) {
// pubsub should trigger its poll a few times during this period
// each time it should continue processing each event until it reaches one that is too new to process
func(t testing.TB, ic in.TestContext) {
for i := 0; i < 60; i++ {
for i := 0; i < 200; i++ {
in.Call("publisher", "publishOne", in.Obj{}, func(t testing.TB, resp in.Obj) {})(t, ic)
time.Sleep(time.Millisecond * 50)
time.Sleep(time.Millisecond * 20)
}
},

Expand Down Expand Up @@ -80,3 +80,29 @@ func TestPubSubConsumptionDelay(t *testing.T) {
`, 0),
)
}

func TestRetry(t *testing.T) {
retriesPerCall := 2
in.Run(t, "",
in.CopyModule("publisher"),
in.CopyModule("subscriber"),
in.Deploy("publisher"),
in.Deploy("subscriber"),

// publish events
in.Call("publisher", "publishOneToTopic2", in.Obj{}, func(t testing.TB, resp in.Obj) {}),

in.Sleep(time.Second*6),

// check that there are the right amount of failed async calls
in.QueryRow("ftl",
fmt.Sprintf(`
SELECT COUNT(*)
FROM async_calls
WHERE
state = 'error'
AND origin = '%s'
`, dal.AsyncOriginPubSub{Subscription: schema.RefKey{Module: "subscriber", Name: "doomed_subscription"}}.String()),
1+retriesPerCall),
)
}
25 changes: 14 additions & 11 deletions backend/controller/pubsub/testdata/go/publisher/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,25 @@ func PublishTen(ctx context.Context) error {
if err != nil {
return err
}
time.Sleep(time.Microsecond * 20)
}
return nil
}

//ftl:verb
func PublishOne(ctx context.Context) error {
logger := ftl.LoggerFromContext(ctx)
for i := 0; i < 10; i++ {
t := time.Now()
logger.Infof("Publishing %v", t)
err := topic.Publish(ctx, PubSubEvent{Time: t})
if err != nil {
return err
}
time.Sleep(time.Microsecond * 20)
}
return nil
t := time.Now()
logger.Infof("Publishing %v", t)
return topic.Publish(ctx, PubSubEvent{Time: t})
}

//ftl:export
var topic2 = ftl.Topic[PubSubEvent]("topic2")

//ftl:verb
func PublishOneToTopic2(ctx context.Context) error {
logger := ftl.LoggerFromContext(ctx)
t := time.Now()
logger.Infof("Publishing to topic2 %v", t)
return topic2.Publish(ctx, PubSubEvent{Time: t})
}
13 changes: 11 additions & 2 deletions backend/controller/pubsub/testdata/go/subscriber/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package subscriber

import (
"context"
"fmt"
"ftl/publisher"

"github.com/TBD54566975/ftl/go-runtime/ftl" // Import the FTL SDK.
Expand All @@ -12,7 +13,15 @@ var _ = ftl.Subscription(publisher.Test_topic, "test_subscription")
//ftl:verb
//ftl:subscribe test_subscription
func Consume(ctx context.Context, req publisher.PubSubEvent) error {
logger := ftl.LoggerFromContext(ctx)
logger.Infof("Subscriber is processing %v", req.Time)
ftl.LoggerFromContext(ctx).Infof("Subscriber is consuming %v", req.Time)
return nil
}

var _ = ftl.Subscription(publisher.Topic2, "doomed_subscription")

//ftl:verb
//ftl:subscribe doomed_subscription
//ftl:retry 2 1s 1s
func ConsumeButFailAndRetry(ctx context.Context, req publisher.PubSubEvent) error {
return fmt.Errorf("always error: event %v", req.Time)
}
5 changes: 5 additions & 0 deletions backend/schema/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,7 @@ func TestParsing(t *testing.T) {
verb consumesBothASubs(test.eventA) Unit
+subscribe subA1
+subscribe subA2
+retry 1m5s 1h
}
`,
expected: &Schema{
Expand Down Expand Up @@ -630,6 +631,10 @@ func TestParsing(t *testing.T) {
&MetadataSubscriber{
Name: "subA2",
},
&MetadataRetry{
MinBackoff: "1m5s",
MaxBackoff: "1h",
},
},
},
}},
Expand Down
4 changes: 2 additions & 2 deletions backend/schema/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,8 @@ func TestValidate(t *testing.T) {
}
`,
errs: []string{
`4:7-7: verb A: retries can only be added to FSM transitions`,
`6:7-7: verb B: retries can only be added to FSM transitions`,
`4:7-7: verb A: retries can only be added to subscribers or FSM transitions`,
`6:7-7: verb B: retries can only be added to subscribers or FSM transitions`,
},
},
{name: "InvalidRetryDurations",
Expand Down
1 change: 1 addition & 0 deletions go-runtime/compile/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ func TestExtractModulePubSub(t *testing.T) {
verb processBroadcast(pubsub.PayinEvent) Unit
+subscribe broadcastSubscription
+retry 10 1s
verb processPayin(pubsub.PayinEvent) Unit
+subscribe paymentProcessing
Expand Down
1 change: 1 addition & 0 deletions go-runtime/compile/testdata/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func Broadcast(ctx context.Context) error {
}

//ftl:subscribe broadcastSubscription
//ftl:retry 10 1s
func ProcessBroadcast(ctx context.Context, event PayinEvent) error {
logger := ftl.LoggerFromContext(ctx)
logger.Infof("Received broadcast event: %v", event)
Expand Down

0 comments on commit b36daf8

Please sign in to comment.