diff --git a/pubsub/tests/bench_pubsub.go b/pubsub/tests/bench_pubsub.go index a27ff6afc..4280eaa89 100644 --- a/pubsub/tests/bench_pubsub.go +++ b/pubsub/tests/bench_pubsub.go @@ -15,7 +15,7 @@ type BenchmarkPubSubConstructor func(n int) (message.Publisher, message.Subscrib // BenchSubscriber runs benchmark on a message Subscriber. func BenchSubscriber(b *testing.B, pubSubConstructor BenchmarkPubSubConstructor) { pub, sub := pubSubConstructor(b.N) - topicName := testTopicName(NewTestID()) + topicName := testTopicName(TestContext{TestID: NewTestID()}) messages, err := sub.Subscribe(context.Background(), topicName) if err != nil { diff --git a/pubsub/tests/test_pubsub.go b/pubsub/tests/test_pubsub.go index 809a2f65c..9c0232a78 100644 --- a/pubsub/tests/test_pubsub.go +++ b/pubsub/tests/test_pubsub.go @@ -121,6 +121,9 @@ type Features struct { // NewSubscriberReceivesOldMessages should be set to true if messages are persisted even // if they are already consumed (for example, like in Kafka). NewSubscriberReceivesOldMessages bool + + // GenerateTopicFunc overrides standard topic name generation. + GenerateTopicFunc func(tctx TestContext) string } // RunOnlyFastTests returns true if -short flag was provided -race was not provided. @@ -218,7 +221,7 @@ func TestPublishSubscribe( ) { pub, sub := pubSubConstructor(t) - topicName := testTopicName(tCtx.TestID) + topicName := testTopicName(tCtx) if subscribeInitializer, ok := sub.(message.SubscribeInitializer); ok { require.NoError(t, subscribeInitializer.SubscribeInitialize(topicName)) @@ -267,7 +270,7 @@ func TestConcurrentSubscribe( pub, initSub := pubSubConstructor(t) defer closePubSub(t, pub, initSub) - topicName := testTopicName(tCtx.TestID) + topicName := testTopicName(tCtx) messagesCount := 5000 subscribersCount := 50 @@ -322,7 +325,7 @@ func TestConcurrentSubscribeMultipleTopics( for i := 0; i < messagesCount; i++ { id := watermill.NewUUID() - msg := message.NewMessage(id, nil) + msg := message.NewMessage(id, []byte("x")) messagesToPublish = append(messagesToPublish, msg) } @@ -332,7 +335,7 @@ func TestConcurrentSubscribeMultipleTopics( receivedMessagesCh := make(chan message.Messages, topicsCount) for i := 0; i < topicsCount; i++ { - topicName := testTopicName(tCtx.TestID) + fmt.Sprintf("-%d", i) + topicName := testTopicName(tCtx) + fmt.Sprintf("-%d", i) var messagesToPublishForTopic []*message.Message for _, msg := range messagesToPublish { @@ -397,7 +400,7 @@ func TestPublishSubscribeInOrder( pub, initSub := pubSubConstructor(t) defer closePubSub(t, pub, initSub) - topicName := testTopicName(tCtx.TestID) + topicName := testTopicName(tCtx) if subscribeInitializer, ok := initSub.(message.SubscribeInitializer); ok { require.NoError(t, subscribeInitializer.SubscribeInitialize(topicName)) @@ -467,7 +470,7 @@ func TestResendOnError( pub, sub := pubSubConstructor(t) defer closePubSub(t, pub, sub) - topicName := testTopicName(tCtx.TestID) + topicName := testTopicName(tCtx) if subscribeInitializer, ok := sub.(message.SubscribeInitializer); ok { require.NoError(t, subscribeInitializer.SubscribeInitialize(topicName)) @@ -520,7 +523,7 @@ func TestNoAck( pub, sub := pubSubConstructor(t) defer closePubSub(t, pub, sub) - topicName := testTopicName(tCtx.TestID) + topicName := testTopicName(tCtx) if subscribeInitializer, ok := sub.(message.SubscribeInitializer); ok { require.NoError(t, subscribeInitializer.SubscribeInitialize(topicName)) @@ -530,7 +533,7 @@ func TestNoAck( id := watermill.NewUUID() log.Printf("sending %s", id) - msg := message.NewMessage(id, nil) + msg := message.NewMessage(id, []byte("x")) err := publishWithRetry(pub, topicName, msg) require.NoError(t, err) @@ -607,7 +610,7 @@ func TestContinueAfterSubscribeClose( pub, sub := createPubSub(t) defer closePubSub(t, pub, sub) - topicName := testTopicName(tCtx.TestID) + topicName := testTopicName(tCtx) if subscribeInitializer, ok := sub.(message.SubscribeInitializer); ok { require.NoError(t, subscribeInitializer.SubscribeInitialize(topicName)) } @@ -688,7 +691,7 @@ func TestConcurrentClose( t.Skip("ExactlyOnceDelivery test is not supported yet") } - topicName := testTopicName(tCtx.TestID) + topicName := testTopicName(tCtx) createPub, createSub := createPubSub(t) if subscribeInitializer, ok := createSub.(message.SubscribeInitializer); ok { require.NoError(t, subscribeInitializer.SubscribeInitialize(topicName)) @@ -738,7 +741,7 @@ func TestContinueAfterErrors( pub, sub := createPubSub(t) defer closePubSub(t, pub, sub) - topicName := testTopicName(tCtx.TestID) + topicName := testTopicName(tCtx) if subscribeInitializer, ok := sub.(message.SubscribeInitializer); ok { require.NoError(t, subscribeInitializer.SubscribeInitialize(topicName)) } @@ -804,7 +807,7 @@ func TestConsumerGroups( publisherPub, publisherSub := pubSubConstructor(t, "test_"+watermill.NewUUID()) defer closePubSub(t, publisherPub, publisherSub) - topicName := testTopicName(tCtx.TestID) + topicName := testTopicName(tCtx) if subscribeInitializer, ok := publisherSub.(message.SubscribeInitializer); ok { require.NoError(t, subscribeInitializer.SubscribeInitialize(topicName)) } @@ -828,7 +831,7 @@ func TestPublisherClose( pub, sub := pubSubConstructor(t) defer closePubSub(t, pub, sub) - topicName := testTopicName(tCtx.TestID) + topicName := testTopicName(tCtx) if subscribeInitializer, ok := sub.(message.SubscribeInitializer); ok { require.NoError(t, subscribeInitializer.SubscribeInitialize(topicName)) } @@ -856,8 +859,8 @@ func TestTopic( pub, sub := pubSubConstructor(t) defer closePubSub(t, pub, sub) - topic1 := testTopicName(tCtx.TestID) + "-1" - topic2 := testTopicName(tCtx.TestID) + "-2" + topic1 := testTopicName(tCtx) + "-1" + topic2 := testTopicName(tCtx) + "-2" if subscribeInitializer, ok := sub.(message.SubscribeInitializer); ok { require.NoError(t, subscribeInitializer.SubscribeInitialize(topic1)) @@ -866,8 +869,8 @@ func TestTopic( require.NoError(t, subscribeInitializer.SubscribeInitialize(topic2)) } - topic1Msg := message.NewMessage(watermill.NewUUID(), nil) - topic2Msg := message.NewMessage(watermill.NewUUID(), nil) + topic1Msg := message.NewMessage(watermill.NewUUID(), []byte("x")) + topic2Msg := message.NewMessage(watermill.NewUUID(), []byte("x")) require.NoError(t, publishWithRetry(pub, topic1, topic1Msg)) require.NoError(t, publishWithRetry(pub, topic2, topic2Msg)) @@ -904,12 +907,12 @@ func TestMessageCtx( pub, sub := pubSubConstructor(t) defer closePubSub(t, pub, sub) - topicName := testTopicName(tCtx.TestID) + topicName := testTopicName(tCtx) if subscribeInitializer, ok := sub.(message.SubscribeInitializer); ok { require.NoError(t, subscribeInitializer.SubscribeInitialize(topicName)) } - msg := message.NewMessage(watermill.NewUUID(), nil) + msg := message.NewMessage(watermill.NewUUID(), []byte("x")) // ensuring that context is not propagated via pub/sub ctx, ctxCancel := context.WithCancel(context.Background()) @@ -986,7 +989,7 @@ func TestSubscribeCtx( ctxWithCancel, cancel := context.WithCancel(context.Background()) ctxWithCancel = context.WithValue(ctxWithCancel, contextKey("foo"), "bar") - topicName := testTopicName(tCtx.TestID) + topicName := testTopicName(tCtx) if subscribeInitializer, ok := sub.(message.SubscribeInitializer); ok { require.NoError(t, subscribeInitializer.SubscribeInitialize(topicName)) } @@ -1037,7 +1040,7 @@ func TestReconnect( pub, sub := pubSubConstructor(t) - topicName := testTopicName(tCtx.TestID) + topicName := testTopicName(tCtx) if subscribeInitializer, ok := sub.(message.SubscribeInitializer); ok { require.NoError(t, subscribeInitializer.SubscribeInitialize(topicName)) } @@ -1078,7 +1081,7 @@ func TestReconnect( go func() { for range publishMessage { id := watermill.NewUUID() - msg := message.NewMessage(id, nil) + msg := message.NewMessage(id, []byte("x")) for { fmt.Println("publishing message") @@ -1123,7 +1126,7 @@ func TestNewSubscriberReceivesOldMessages( pub, sub := pubSubConstructor(t) - topicName := testTopicName(tCtx.TestID) + topicName := testTopicName(tCtx) if subscribeInitializer, ok := sub.(message.SubscribeInitializer); ok { require.NoError(t, subscribeInitializer.SubscribeInitialize(topicName)) } @@ -1219,8 +1222,12 @@ func assertConsumerGroupReceivedMessages( AssertAllMessagesReceived(t, expectedMessages, receivedMessages) } -func testTopicName(testID TestID) string { - return "topic-" + string(testID) +func testTopicName(tctx TestContext) string { + if tctx.Features.GenerateTopicFunc != nil { + return tctx.Features.GenerateTopicFunc(tctx) + } + + return "topic-" + string(tctx.TestID) } func closePubSub(t *testing.T, pub message.Publisher, sub message.Subscriber) { @@ -1254,7 +1261,7 @@ func PublishSimpleMessages(t *testing.T, messagesCount int, publisher message.Pu for i := 0; i < messagesCount; i++ { id := watermill.NewUUID() - msg := message.NewMessage(id, nil) + msg := message.NewMessage(id, []byte("x")) messagesToPublish = append(messagesToPublish, msg) err := publishWithRetry(publisher, topicName, msg) @@ -1286,7 +1293,7 @@ func AddSimpleMessagesParallel(t *testing.T, messagesCount int, publisher messag for i := 0; i < messagesCount; i++ { id := watermill.NewUUID() - msg := message.NewMessage(id, nil) + msg := message.NewMessage(id, []byte("x")) messagesToPublish = append(messagesToPublish, msg) publishMsg <- msg