From 858254b0d387c50c3f0eb4b9aca027e7885c8dda Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Tue, 31 Oct 2023 11:06:42 -0700 Subject: [PATCH] test(pubsub): add retry guard to ordering key test (#8765) --- pubsub/integration_test.go | 209 ++++++++++++++++++------------------- 1 file changed, 104 insertions(+), 105 deletions(-) diff --git a/pubsub/integration_test.go b/pubsub/integration_test.go index f801c2793bf7..9ac1d057725a 100644 --- a/pubsub/integration_test.go +++ b/pubsub/integration_test.go @@ -1209,118 +1209,117 @@ func TestIntegration_OrderedKeys_JSON(t *testing.T) { client := integrationTestClient(ctx, t, option.WithEndpoint("us-west1-pubsub.googleapis.com:443")) defer client.Close() - topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil) - if err != nil { - t.Fatal(err) - } - defer topic.Delete(ctx) - defer topic.Stop() - exists, err := topic.Exists(ctx) - if err != nil { - t.Fatal(err) - } - if !exists { - t.Fatalf("topic %v should exist, but it doesn't", topic) - } - var sub *Subscription - if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), SubscriptionConfig{ - Topic: topic, - EnableMessageOrdering: true, - }); err != nil { - t.Fatal(err) - } - defer sub.Delete(ctx) - exists, err = sub.Exists(ctx) - if err != nil { - t.Fatal(err) - } - if !exists { - t.Fatalf("subscription %s should exist, but it doesn't", sub.ID()) - } - - topic.PublishSettings.DelayThreshold = time.Second - topic.EnableMessageOrdering = true - - inFile, err := os.Open("testdata/publish.csv") - if err != nil { - t.Fatal(err) - } - defer inFile.Close() - - mu := sync.Mutex{} - var publishData []testutil2.OrderedKeyMsg - var receiveData []testutil2.OrderedKeyMsg - // Keep track of duplicate messages to avoid negative waitgroup counter. - receiveSet := make(map[string]struct{}) - - wg := sync.WaitGroup{} - scanner := bufio.NewScanner(inFile) - for scanner.Scan() { - line := scanner.Text() - // TODO: use strings.ReplaceAll once we only support 1.11+. - line = strings.Replace(line, "\"", "", -1) - parts := strings.Split(line, ",") - key := parts[0] - msg := parts[1] - publishData = append(publishData, testutil2.OrderedKeyMsg{Key: key, Data: msg}) - topic.Publish(ctx, &Message{ - Data: []byte(msg), - OrderingKey: key, - }) - wg.Add(1) - } - if err := scanner.Err(); err != nil { - t.Fatal(err) - } - - receiveDone := make(chan struct{}) - ctx, cancel := context.WithCancel(ctx) - go func() { - if err := sub.Receive(ctx, func(ctx context.Context, msg *Message) { - mu.Lock() - defer mu.Unlock() - // Messages are deduped using the data field, since in this case all - // messages are unique. - if _, ok := receiveSet[string(msg.Data)]; ok { - return - } - receiveSet[string(msg.Data)] = struct{}{} - receiveData = append(receiveData, testutil2.OrderedKeyMsg{Key: msg.OrderingKey, Data: string(msg.Data)}) - wg.Done() - msg.Ack() + testutil.Retry(t, 2, 0, func(r *testutil.R) { + topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil) + if err != nil { + r.Errorf("createTopicWithRetry err: %v", err) + } + defer topic.Delete(ctx) + defer topic.Stop() + exists, err := topic.Exists(ctx) + if err != nil { + r.Errorf("topic.Exists err: %v", err) + } + if !exists { + r.Errorf("topic %v should exist, but it doesn't", topic) + } + var sub *Subscription + if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), SubscriptionConfig{ + Topic: topic, + EnableMessageOrdering: true, }); err != nil { - if c := status.Code(err); c != codes.Canceled { - t.Error(err) - } + r.Errorf("creteSubWithRetry err: %v", err) + } + defer sub.Delete(ctx) + exists, err = sub.Exists(ctx) + if err != nil { + r.Errorf("sub.Exists err: %v", err) + } + if !exists { + r.Errorf("subscription %s should exist, but it doesn't", sub.ID()) } - close(receiveDone) - }() - done := make(chan struct{}) - go func() { - wg.Wait() - close(done) - }() + topic.PublishSettings.DelayThreshold = time.Second + topic.EnableMessageOrdering = true - select { - case <-done: - cancel() - case <-time.After(5 * time.Minute): - t.Fatal("timed out after 5m waiting for all messages to be received") - } + inFile, err := os.Open("testdata/publish.csv") + if err != nil { + r.Errorf("os.Open err: %v", err) + } + defer inFile.Close() + + mu := sync.Mutex{} + var publishData []testutil2.OrderedKeyMsg + var receiveData []testutil2.OrderedKeyMsg + // Keep track of duplicate messages to avoid negative waitgroup counter. + receiveSet := make(map[string]struct{}) + + wg := sync.WaitGroup{} + scanner := bufio.NewScanner(inFile) + for scanner.Scan() { + line := scanner.Text() + // TODO: use strings.ReplaceAll once we only support 1.11+. + line = strings.Replace(line, "\"", "", -1) + parts := strings.Split(line, ",") + key := parts[0] + msg := parts[1] + publishData = append(publishData, testutil2.OrderedKeyMsg{Key: key, Data: msg}) + res := topic.Publish(ctx, &Message{ + Data: []byte(msg), + OrderingKey: key, + }) + go func() { + _, err := res.Get(ctx) + if err != nil { + // Can't fail inside goroutine, so just log the error. + r.Logf("publish error for message(%s): %v", msg, err) + } + }() + wg.Add(1) + } + if err := scanner.Err(); err != nil { + r.Errorf("scanner.Err(): %v", err) + } - mu.Lock() - defer mu.Unlock() - if err := testutil2.VerifyKeyOrdering(publishData, receiveData); err != nil { - t.Fatalf("VerifyKeyOrdering error: %v", err) - } + go func() { + if err := sub.Receive(ctx, func(ctx context.Context, msg *Message) { + mu.Lock() + defer mu.Unlock() + // Messages are deduped using the data field, since in this case all + // messages are unique. + if _, ok := receiveSet[string(msg.Data)]; ok { + r.Logf("received duplicate message: %s", msg.Data) + return + } + receiveSet[string(msg.Data)] = struct{}{} + receiveData = append(receiveData, testutil2.OrderedKeyMsg{Key: msg.OrderingKey, Data: string(msg.Data)}) + wg.Done() + msg.Ack() + }); err != nil { + if c := status.Code(err); c != codes.Canceled { + r.Errorf("status.Code(err) got: %v, want cancelled", err) + } + } + }() - select { - case <-receiveDone: - case <-time.After(5 * time.Minute): - t.Fatal("timed out after 5m waiting for receive to exit") - } + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + select { + case <-done: + case <-time.After(2 * time.Minute): + r.Errorf("timed out after 2m waiting for all messages to be received") + } + + mu.Lock() + defer mu.Unlock() + if err := testutil2.VerifyKeyOrdering(publishData, receiveData); err != nil { + r.Errorf("VerifyKeyOrdering error: %v", err) + } + }) } func TestIntegration_OrderedKeys_ResumePublish(t *testing.T) {