Skip to content

Commit

Permalink
allow generating test topics and sending non-empty messages (#490)
Browse files Browse the repository at this point in the history
  • Loading branch information
roblaszczak authored Sep 5, 2024
1 parent ca938e0 commit fc5e4fc
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 28 deletions.
2 changes: 1 addition & 1 deletion pubsub/tests/bench_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type BenchmarkPubSubConstructor func(n int) (message.Publisher, message.Subscrib
// BenchSubscriber runs benchmark on a message Subscriber.
func BenchSubscriber(b *testing.B, pubSubConstructor BenchmarkPubSubConstructor) {
pub, sub := pubSubConstructor(b.N)
topicName := testTopicName(NewTestID())
topicName := testTopicName(TestContext{TestID: NewTestID()})

messages, err := sub.Subscribe(context.Background(), topicName)
if err != nil {
Expand Down
61 changes: 34 additions & 27 deletions pubsub/tests/test_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ type Features struct {
// NewSubscriberReceivesOldMessages should be set to true if messages are persisted even
// if they are already consumed (for example, like in Kafka).
NewSubscriberReceivesOldMessages bool

// GenerateTopicFunc overrides standard topic name generation.
GenerateTopicFunc func(tctx TestContext) string
}

// RunOnlyFastTests returns true if -short flag was provided -race was not provided.
Expand Down Expand Up @@ -218,7 +221,7 @@ func TestPublishSubscribe(
) {
pub, sub := pubSubConstructor(t)

topicName := testTopicName(tCtx.TestID)
topicName := testTopicName(tCtx)

if subscribeInitializer, ok := sub.(message.SubscribeInitializer); ok {
require.NoError(t, subscribeInitializer.SubscribeInitialize(topicName))
Expand Down Expand Up @@ -267,7 +270,7 @@ func TestConcurrentSubscribe(
pub, initSub := pubSubConstructor(t)
defer closePubSub(t, pub, initSub)

topicName := testTopicName(tCtx.TestID)
topicName := testTopicName(tCtx)

messagesCount := 5000
subscribersCount := 50
Expand Down Expand Up @@ -322,7 +325,7 @@ func TestConcurrentSubscribeMultipleTopics(
for i := 0; i < messagesCount; i++ {
id := watermill.NewUUID()

msg := message.NewMessage(id, nil)
msg := message.NewMessage(id, []byte("x"))
messagesToPublish = append(messagesToPublish, msg)
}

Expand All @@ -332,7 +335,7 @@ func TestConcurrentSubscribeMultipleTopics(
receivedMessagesCh := make(chan message.Messages, topicsCount)

for i := 0; i < topicsCount; i++ {
topicName := testTopicName(tCtx.TestID) + fmt.Sprintf("-%d", i)
topicName := testTopicName(tCtx) + fmt.Sprintf("-%d", i)

var messagesToPublishForTopic []*message.Message
for _, msg := range messagesToPublish {
Expand Down Expand Up @@ -397,7 +400,7 @@ func TestPublishSubscribeInOrder(
pub, initSub := pubSubConstructor(t)
defer closePubSub(t, pub, initSub)

topicName := testTopicName(tCtx.TestID)
topicName := testTopicName(tCtx)

if subscribeInitializer, ok := initSub.(message.SubscribeInitializer); ok {
require.NoError(t, subscribeInitializer.SubscribeInitialize(topicName))
Expand Down Expand Up @@ -467,7 +470,7 @@ func TestResendOnError(
pub, sub := pubSubConstructor(t)
defer closePubSub(t, pub, sub)

topicName := testTopicName(tCtx.TestID)
topicName := testTopicName(tCtx)

if subscribeInitializer, ok := sub.(message.SubscribeInitializer); ok {
require.NoError(t, subscribeInitializer.SubscribeInitialize(topicName))
Expand Down Expand Up @@ -520,7 +523,7 @@ func TestNoAck(
pub, sub := pubSubConstructor(t)
defer closePubSub(t, pub, sub)

topicName := testTopicName(tCtx.TestID)
topicName := testTopicName(tCtx)

if subscribeInitializer, ok := sub.(message.SubscribeInitializer); ok {
require.NoError(t, subscribeInitializer.SubscribeInitialize(topicName))
Expand All @@ -530,7 +533,7 @@ func TestNoAck(
id := watermill.NewUUID()
log.Printf("sending %s", id)

msg := message.NewMessage(id, nil)
msg := message.NewMessage(id, []byte("x"))

err := publishWithRetry(pub, topicName, msg)
require.NoError(t, err)
Expand Down Expand Up @@ -607,7 +610,7 @@ func TestContinueAfterSubscribeClose(
pub, sub := createPubSub(t)
defer closePubSub(t, pub, sub)

topicName := testTopicName(tCtx.TestID)
topicName := testTopicName(tCtx)
if subscribeInitializer, ok := sub.(message.SubscribeInitializer); ok {
require.NoError(t, subscribeInitializer.SubscribeInitialize(topicName))
}
Expand Down Expand Up @@ -688,7 +691,7 @@ func TestConcurrentClose(
t.Skip("ExactlyOnceDelivery test is not supported yet")
}

topicName := testTopicName(tCtx.TestID)
topicName := testTopicName(tCtx)
createPub, createSub := createPubSub(t)
if subscribeInitializer, ok := createSub.(message.SubscribeInitializer); ok {
require.NoError(t, subscribeInitializer.SubscribeInitialize(topicName))
Expand Down Expand Up @@ -738,7 +741,7 @@ func TestContinueAfterErrors(
pub, sub := createPubSub(t)
defer closePubSub(t, pub, sub)

topicName := testTopicName(tCtx.TestID)
topicName := testTopicName(tCtx)
if subscribeInitializer, ok := sub.(message.SubscribeInitializer); ok {
require.NoError(t, subscribeInitializer.SubscribeInitialize(topicName))
}
Expand Down Expand Up @@ -804,7 +807,7 @@ func TestConsumerGroups(
publisherPub, publisherSub := pubSubConstructor(t, "test_"+watermill.NewUUID())
defer closePubSub(t, publisherPub, publisherSub)

topicName := testTopicName(tCtx.TestID)
topicName := testTopicName(tCtx)
if subscribeInitializer, ok := publisherSub.(message.SubscribeInitializer); ok {
require.NoError(t, subscribeInitializer.SubscribeInitialize(topicName))
}
Expand All @@ -828,7 +831,7 @@ func TestPublisherClose(
pub, sub := pubSubConstructor(t)
defer closePubSub(t, pub, sub)

topicName := testTopicName(tCtx.TestID)
topicName := testTopicName(tCtx)
if subscribeInitializer, ok := sub.(message.SubscribeInitializer); ok {
require.NoError(t, subscribeInitializer.SubscribeInitialize(topicName))
}
Expand Down Expand Up @@ -856,8 +859,8 @@ func TestTopic(
pub, sub := pubSubConstructor(t)
defer closePubSub(t, pub, sub)

topic1 := testTopicName(tCtx.TestID) + "-1"
topic2 := testTopicName(tCtx.TestID) + "-2"
topic1 := testTopicName(tCtx) + "-1"
topic2 := testTopicName(tCtx) + "-2"

if subscribeInitializer, ok := sub.(message.SubscribeInitializer); ok {
require.NoError(t, subscribeInitializer.SubscribeInitialize(topic1))
Expand All @@ -866,8 +869,8 @@ func TestTopic(
require.NoError(t, subscribeInitializer.SubscribeInitialize(topic2))
}

topic1Msg := message.NewMessage(watermill.NewUUID(), nil)
topic2Msg := message.NewMessage(watermill.NewUUID(), nil)
topic1Msg := message.NewMessage(watermill.NewUUID(), []byte("x"))
topic2Msg := message.NewMessage(watermill.NewUUID(), []byte("x"))

require.NoError(t, publishWithRetry(pub, topic1, topic1Msg))
require.NoError(t, publishWithRetry(pub, topic2, topic2Msg))
Expand Down Expand Up @@ -904,12 +907,12 @@ func TestMessageCtx(
pub, sub := pubSubConstructor(t)
defer closePubSub(t, pub, sub)

topicName := testTopicName(tCtx.TestID)
topicName := testTopicName(tCtx)
if subscribeInitializer, ok := sub.(message.SubscribeInitializer); ok {
require.NoError(t, subscribeInitializer.SubscribeInitialize(topicName))
}

msg := message.NewMessage(watermill.NewUUID(), nil)
msg := message.NewMessage(watermill.NewUUID(), []byte("x"))

// ensuring that context is not propagated via pub/sub
ctx, ctxCancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -986,7 +989,7 @@ func TestSubscribeCtx(
ctxWithCancel, cancel := context.WithCancel(context.Background())
ctxWithCancel = context.WithValue(ctxWithCancel, contextKey("foo"), "bar")

topicName := testTopicName(tCtx.TestID)
topicName := testTopicName(tCtx)
if subscribeInitializer, ok := sub.(message.SubscribeInitializer); ok {
require.NoError(t, subscribeInitializer.SubscribeInitialize(topicName))
}
Expand Down Expand Up @@ -1037,7 +1040,7 @@ func TestReconnect(

pub, sub := pubSubConstructor(t)

topicName := testTopicName(tCtx.TestID)
topicName := testTopicName(tCtx)
if subscribeInitializer, ok := sub.(message.SubscribeInitializer); ok {
require.NoError(t, subscribeInitializer.SubscribeInitialize(topicName))
}
Expand Down Expand Up @@ -1078,7 +1081,7 @@ func TestReconnect(
go func() {
for range publishMessage {
id := watermill.NewUUID()
msg := message.NewMessage(id, nil)
msg := message.NewMessage(id, []byte("x"))

for {
fmt.Println("publishing message")
Expand Down Expand Up @@ -1123,7 +1126,7 @@ func TestNewSubscriberReceivesOldMessages(

pub, sub := pubSubConstructor(t)

topicName := testTopicName(tCtx.TestID)
topicName := testTopicName(tCtx)
if subscribeInitializer, ok := sub.(message.SubscribeInitializer); ok {
require.NoError(t, subscribeInitializer.SubscribeInitialize(topicName))
}
Expand Down Expand Up @@ -1219,8 +1222,12 @@ func assertConsumerGroupReceivedMessages(
AssertAllMessagesReceived(t, expectedMessages, receivedMessages)
}

func testTopicName(testID TestID) string {
return "topic-" + string(testID)
func testTopicName(tctx TestContext) string {
if tctx.Features.GenerateTopicFunc != nil {
return tctx.Features.GenerateTopicFunc(tctx)
}

return "topic-" + string(tctx.TestID)
}

func closePubSub(t *testing.T, pub message.Publisher, sub message.Subscriber) {
Expand Down Expand Up @@ -1254,7 +1261,7 @@ func PublishSimpleMessages(t *testing.T, messagesCount int, publisher message.Pu
for i := 0; i < messagesCount; i++ {
id := watermill.NewUUID()

msg := message.NewMessage(id, nil)
msg := message.NewMessage(id, []byte("x"))
messagesToPublish = append(messagesToPublish, msg)

err := publishWithRetry(publisher, topicName, msg)
Expand Down Expand Up @@ -1286,7 +1293,7 @@ func AddSimpleMessagesParallel(t *testing.T, messagesCount int, publisher messag
for i := 0; i < messagesCount; i++ {
id := watermill.NewUUID()

msg := message.NewMessage(id, nil)
msg := message.NewMessage(id, []byte("x"))
messagesToPublish = append(messagesToPublish, msg)

publishMsg <- msg
Expand Down

0 comments on commit fc5e4fc

Please sign in to comment.