Skip to content

Commit

Permalink
test(pubsub): dedupe by message data in ordering keys json test (#8526)
Browse files Browse the repository at this point in the history
* test(pubsub): dedupe by message data in ordering keys json test

* switch cmp.Diff order so the error logging makes sense

* switch to using t.Logf
  • Loading branch information
hongalex authored Sep 11, 2023
1 parent a4b2cfb commit 84888bf
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 7 deletions.
20 changes: 14 additions & 6 deletions pubsub/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1263,11 +1263,17 @@ func TestIntegration_OrderedKeys_JSON(t *testing.T) {
key := parts[0]
msg := parts[1]
publishData = append(publishData, testutil2.OrderedKeyMsg{Key: key, Data: msg})
topic.Publish(ctx, &Message{
ID: msg,
r := topic.Publish(ctx, &Message{
Data: []byte(msg),
OrderingKey: key,
})
go func() {
_, err := r.Get(ctx)
if err != nil {
// Can't fail inside goroutine, so just log the error.
t.Logf("publish error for message(%s): %v", msg, err)
}
}()
wg.Add(1)
}
if err := scanner.Err(); err != nil {
Expand All @@ -1276,15 +1282,17 @@ func TestIntegration_OrderedKeys_JSON(t *testing.T) {

go func() {
if err := sub.Receive(ctx, func(ctx context.Context, msg *Message) {
defer msg.Ack()
mu.Lock()
defer mu.Unlock()
if _, ok := receiveSet[msg.ID]; ok {
// Messages are deduped using the data field, since in this case all
// messages are unique.
if _, ok := receiveSet[string(msg.Data)]; ok {
return
}
receiveSet[msg.ID] = struct{}{}
receiveSet[string(msg.Data)] = struct{}{}
receiveData = append(receiveData, testutil2.OrderedKeyMsg{Key: msg.OrderingKey, Data: string(msg.Data)})
wg.Done()
msg.Ack()
}); err != nil {
if c := status.Code(err); c != codes.Canceled {
t.Error(err)
Expand All @@ -1307,7 +1315,7 @@ func TestIntegration_OrderedKeys_JSON(t *testing.T) {
mu.Lock()
defer mu.Unlock()
if err := testutil2.VerifyKeyOrdering(publishData, receiveData); err != nil {
t.Fatalf("CreateTopic error: %v", err)
t.Fatalf("VerifyKeyOrdering error: %v", err)
}
}

Expand Down
2 changes: 1 addition & 1 deletion pubsub/internal/testutil/verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func VerifyKeyOrdering(publishData, receiveData []OrderedKeyMsg) error {
return fmt.Errorf("saw key %s, but we never published this key", k)
}

if diff := cmp.Diff(pb, rd); diff != "" {
if diff := cmp.Diff(rd, pb); diff != "" {
return fmt.Errorf("%s: got -, want +\n\t%s", k, diff)
}
}
Expand Down

0 comments on commit 84888bf

Please sign in to comment.