diff --git a/pubsub/integration_test.go b/pubsub/integration_test.go index 960e8240da4d..ed05e832bdb1 100644 --- a/pubsub/integration_test.go +++ b/pubsub/integration_test.go @@ -1263,11 +1263,17 @@ func TestIntegration_OrderedKeys_JSON(t *testing.T) { key := parts[0] msg := parts[1] publishData = append(publishData, testutil2.OrderedKeyMsg{Key: key, Data: msg}) - topic.Publish(ctx, &Message{ - ID: msg, + r := topic.Publish(ctx, &Message{ Data: []byte(msg), OrderingKey: key, }) + go func() { + _, err := r.Get(ctx) + if err != nil { + // Can't fail inside goroutine, so just log the error. + t.Logf("publish error for message(%s): %v", msg, err) + } + }() wg.Add(1) } if err := scanner.Err(); err != nil { @@ -1276,15 +1282,17 @@ func TestIntegration_OrderedKeys_JSON(t *testing.T) { go func() { if err := sub.Receive(ctx, func(ctx context.Context, msg *Message) { - defer msg.Ack() mu.Lock() defer mu.Unlock() - if _, ok := receiveSet[msg.ID]; ok { + // Messages are deduped using the data field, since in this case all + // messages are unique. + if _, ok := receiveSet[string(msg.Data)]; ok { return } - receiveSet[msg.ID] = struct{}{} + receiveSet[string(msg.Data)] = struct{}{} receiveData = append(receiveData, testutil2.OrderedKeyMsg{Key: msg.OrderingKey, Data: string(msg.Data)}) wg.Done() + msg.Ack() }); err != nil { if c := status.Code(err); c != codes.Canceled { t.Error(err) @@ -1307,7 +1315,7 @@ func TestIntegration_OrderedKeys_JSON(t *testing.T) { mu.Lock() defer mu.Unlock() if err := testutil2.VerifyKeyOrdering(publishData, receiveData); err != nil { - t.Fatalf("CreateTopic error: %v", err) + t.Fatalf("VerifyKeyOrdering error: %v", err) } } diff --git a/pubsub/internal/testutil/verifier.go b/pubsub/internal/testutil/verifier.go index 66ab4fa6baca..dd8225488055 100644 --- a/pubsub/internal/testutil/verifier.go +++ b/pubsub/internal/testutil/verifier.go @@ -61,7 +61,7 @@ func VerifyKeyOrdering(publishData, receiveData []OrderedKeyMsg) error { return fmt.Errorf("saw key %s, but we never published this key", k) } - if diff := cmp.Diff(pb, rd); diff != "" { + if diff := cmp.Diff(rd, pb); diff != "" { return fmt.Errorf("%s: got -, want +\n\t%s", k, diff) } }