Skip to content

Commit

Permalink
test(pubsub): add retry guard to ordering key test (#8765)
Browse files Browse the repository at this point in the history
  • Loading branch information
hongalex authored Oct 31, 2023
1 parent 6e77611 commit 858254b
Showing 1 changed file with 104 additions and 105 deletions.
209 changes: 104 additions & 105 deletions pubsub/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1209,118 +1209,117 @@ func TestIntegration_OrderedKeys_JSON(t *testing.T) {
client := integrationTestClient(ctx, t, option.WithEndpoint("us-west1-pubsub.googleapis.com:443"))
defer client.Close()

topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
if err != nil {
t.Fatal(err)
}
defer topic.Delete(ctx)
defer topic.Stop()
exists, err := topic.Exists(ctx)
if err != nil {
t.Fatal(err)
}
if !exists {
t.Fatalf("topic %v should exist, but it doesn't", topic)
}
var sub *Subscription
if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), SubscriptionConfig{
Topic: topic,
EnableMessageOrdering: true,
}); err != nil {
t.Fatal(err)
}
defer sub.Delete(ctx)
exists, err = sub.Exists(ctx)
if err != nil {
t.Fatal(err)
}
if !exists {
t.Fatalf("subscription %s should exist, but it doesn't", sub.ID())
}

topic.PublishSettings.DelayThreshold = time.Second
topic.EnableMessageOrdering = true

inFile, err := os.Open("testdata/publish.csv")
if err != nil {
t.Fatal(err)
}
defer inFile.Close()

mu := sync.Mutex{}
var publishData []testutil2.OrderedKeyMsg
var receiveData []testutil2.OrderedKeyMsg
// Keep track of duplicate messages to avoid negative waitgroup counter.
receiveSet := make(map[string]struct{})

wg := sync.WaitGroup{}
scanner := bufio.NewScanner(inFile)
for scanner.Scan() {
line := scanner.Text()
// TODO: use strings.ReplaceAll once we only support 1.11+.
line = strings.Replace(line, "\"", "", -1)
parts := strings.Split(line, ",")
key := parts[0]
msg := parts[1]
publishData = append(publishData, testutil2.OrderedKeyMsg{Key: key, Data: msg})
topic.Publish(ctx, &Message{
Data: []byte(msg),
OrderingKey: key,
})
wg.Add(1)
}
if err := scanner.Err(); err != nil {
t.Fatal(err)
}

receiveDone := make(chan struct{})
ctx, cancel := context.WithCancel(ctx)
go func() {
if err := sub.Receive(ctx, func(ctx context.Context, msg *Message) {
mu.Lock()
defer mu.Unlock()
// Messages are deduped using the data field, since in this case all
// messages are unique.
if _, ok := receiveSet[string(msg.Data)]; ok {
return
}
receiveSet[string(msg.Data)] = struct{}{}
receiveData = append(receiveData, testutil2.OrderedKeyMsg{Key: msg.OrderingKey, Data: string(msg.Data)})
wg.Done()
msg.Ack()
testutil.Retry(t, 2, 0, func(r *testutil.R) {
topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
if err != nil {
r.Errorf("createTopicWithRetry err: %v", err)
}
defer topic.Delete(ctx)
defer topic.Stop()
exists, err := topic.Exists(ctx)
if err != nil {
r.Errorf("topic.Exists err: %v", err)
}
if !exists {
r.Errorf("topic %v should exist, but it doesn't", topic)
}
var sub *Subscription
if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), SubscriptionConfig{
Topic: topic,
EnableMessageOrdering: true,
}); err != nil {
if c := status.Code(err); c != codes.Canceled {
t.Error(err)
}
r.Errorf("creteSubWithRetry err: %v", err)
}
defer sub.Delete(ctx)
exists, err = sub.Exists(ctx)
if err != nil {
r.Errorf("sub.Exists err: %v", err)
}
if !exists {
r.Errorf("subscription %s should exist, but it doesn't", sub.ID())
}
close(receiveDone)
}()

done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
topic.PublishSettings.DelayThreshold = time.Second
topic.EnableMessageOrdering = true

select {
case <-done:
cancel()
case <-time.After(5 * time.Minute):
t.Fatal("timed out after 5m waiting for all messages to be received")
}
inFile, err := os.Open("testdata/publish.csv")
if err != nil {
r.Errorf("os.Open err: %v", err)
}
defer inFile.Close()

mu := sync.Mutex{}
var publishData []testutil2.OrderedKeyMsg
var receiveData []testutil2.OrderedKeyMsg
// Keep track of duplicate messages to avoid negative waitgroup counter.
receiveSet := make(map[string]struct{})

wg := sync.WaitGroup{}
scanner := bufio.NewScanner(inFile)
for scanner.Scan() {
line := scanner.Text()
// TODO: use strings.ReplaceAll once we only support 1.11+.
line = strings.Replace(line, "\"", "", -1)
parts := strings.Split(line, ",")
key := parts[0]
msg := parts[1]
publishData = append(publishData, testutil2.OrderedKeyMsg{Key: key, Data: msg})
res := topic.Publish(ctx, &Message{
Data: []byte(msg),
OrderingKey: key,
})
go func() {
_, err := res.Get(ctx)
if err != nil {
// Can't fail inside goroutine, so just log the error.
r.Logf("publish error for message(%s): %v", msg, err)
}
}()
wg.Add(1)
}
if err := scanner.Err(); err != nil {
r.Errorf("scanner.Err(): %v", err)
}

mu.Lock()
defer mu.Unlock()
if err := testutil2.VerifyKeyOrdering(publishData, receiveData); err != nil {
t.Fatalf("VerifyKeyOrdering error: %v", err)
}
go func() {
if err := sub.Receive(ctx, func(ctx context.Context, msg *Message) {
mu.Lock()
defer mu.Unlock()
// Messages are deduped using the data field, since in this case all
// messages are unique.
if _, ok := receiveSet[string(msg.Data)]; ok {
r.Logf("received duplicate message: %s", msg.Data)
return
}
receiveSet[string(msg.Data)] = struct{}{}
receiveData = append(receiveData, testutil2.OrderedKeyMsg{Key: msg.OrderingKey, Data: string(msg.Data)})
wg.Done()
msg.Ack()
}); err != nil {
if c := status.Code(err); c != codes.Canceled {
r.Errorf("status.Code(err) got: %v, want cancelled", err)
}
}
}()

select {
case <-receiveDone:
case <-time.After(5 * time.Minute):
t.Fatal("timed out after 5m waiting for receive to exit")
}
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()

select {
case <-done:
case <-time.After(2 * time.Minute):
r.Errorf("timed out after 2m waiting for all messages to be received")
}

mu.Lock()
defer mu.Unlock()
if err := testutil2.VerifyKeyOrdering(publishData, receiveData); err != nil {
r.Errorf("VerifyKeyOrdering error: %v", err)
}
})
}

func TestIntegration_OrderedKeys_ResumePublish(t *testing.T) {
Expand Down

0 comments on commit 858254b

Please sign in to comment.