From bbc2680e4a244fbf8ea0a312bf3393a34dbe5e0c Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Tue, 5 Sep 2023 20:43:08 +0000 Subject: [PATCH 1/3] test(pubsub): dedupe by message data in ordering keys json test --- pubsub/integration_test.go | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/pubsub/integration_test.go b/pubsub/integration_test.go index d78ba332fe91..2e1bb1901e24 100644 --- a/pubsub/integration_test.go +++ b/pubsub/integration_test.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "io/ioutil" + "log" "os" "strings" "sync" @@ -1264,11 +1265,17 @@ func TestIntegration_OrderedKeys_JSON(t *testing.T) { key := parts[0] msg := parts[1] publishData = append(publishData, testutil2.OrderedKeyMsg{Key: key, Data: msg}) - topic.Publish(ctx, &Message{ - ID: msg, + r := topic.Publish(ctx, &Message{ Data: []byte(msg), OrderingKey: key, }) + go func() { + _, err := r.Get(ctx) + if err != nil { + // Can't fail inside goroutine, so just log the error + log.Printf("publish error for message(%s): %v", msg, err) + } + }() wg.Add(1) } if err := scanner.Err(); err != nil { @@ -1277,15 +1284,17 @@ func TestIntegration_OrderedKeys_JSON(t *testing.T) { go func() { if err := sub.Receive(ctx, func(ctx context.Context, msg *Message) { - defer msg.Ack() mu.Lock() defer mu.Unlock() - if _, ok := receiveSet[msg.ID]; ok { + // Messages are deduped using the data field, since in this case all + // messages are unique. + if _, ok := receiveSet[string(msg.Data)]; ok { return } - receiveSet[msg.ID] = struct{}{} + receiveSet[string(msg.Data)] = struct{}{} receiveData = append(receiveData, testutil2.OrderedKeyMsg{Key: msg.OrderingKey, Data: string(msg.Data)}) wg.Done() + msg.Ack() }); err != nil { if c := status.Code(err); c != codes.Canceled { t.Error(err) @@ -1308,7 +1317,7 @@ func TestIntegration_OrderedKeys_JSON(t *testing.T) { mu.Lock() defer mu.Unlock() if err := testutil2.VerifyKeyOrdering(publishData, receiveData); err != nil { - t.Fatalf("CreateTopic error: %v", err) + t.Fatalf("VerifyKeyOrdering error: %v", err) } } From e3ecd0a4f60e5e14051458a50b60831d446397c4 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Tue, 5 Sep 2023 20:46:05 +0000 Subject: [PATCH 2/3] switch cmp.Diff order so the error logging makes sense --- pubsub/internal/testutil/verifier.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pubsub/internal/testutil/verifier.go b/pubsub/internal/testutil/verifier.go index 66ab4fa6baca..dd8225488055 100644 --- a/pubsub/internal/testutil/verifier.go +++ b/pubsub/internal/testutil/verifier.go @@ -61,7 +61,7 @@ func VerifyKeyOrdering(publishData, receiveData []OrderedKeyMsg) error { return fmt.Errorf("saw key %s, but we never published this key", k) } - if diff := cmp.Diff(pb, rd); diff != "" { + if diff := cmp.Diff(rd, pb); diff != "" { return fmt.Errorf("%s: got -, want +\n\t%s", k, diff) } } From cf070d6d52080577125f473ac44acdafa4a96c37 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Mon, 11 Sep 2023 20:53:11 +0000 Subject: [PATCH 3/3] switch to using t.Logf --- pubsub/integration_test.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pubsub/integration_test.go b/pubsub/integration_test.go index 2e1bb1901e24..3b3f3dccf472 100644 --- a/pubsub/integration_test.go +++ b/pubsub/integration_test.go @@ -21,7 +21,6 @@ import ( "errors" "fmt" "io/ioutil" - "log" "os" "strings" "sync" @@ -1272,8 +1271,8 @@ func TestIntegration_OrderedKeys_JSON(t *testing.T) { go func() { _, err := r.Get(ctx) if err != nil { - // Can't fail inside goroutine, so just log the error - log.Printf("publish error for message(%s): %v", msg, err) + // Can't fail inside goroutine, so just log the error. + t.Logf("publish error for message(%s): %v", msg, err) } }() wg.Add(1)