Skip to content

Commit

Permalink
[azeventhubs] Prep for release, fixing tests (#19364)
Browse files Browse the repository at this point in the history
This is just a simple update for QA, no fixes and also pushing the changelog date forward since it got delayed.

- Added in some more tests for recovery. There were some incidents in the stress cluster but they turned out to be red herrings. They did reveal that some more granular tests should exist and now we have them for both link and connection level recovery.
- Fixed a bug in one of the stress tests. We kept getting an odd (but non-fatal) error as the test tore down (after passing!) about the connection idling out. Turns out it was accurate - the batch test is a little unusual in that it doesn't send events continually - it sends them once at the beginning of the test and the consumers just keep rewinding to pick them up again. So the producer client just needed to be closed after it's part in the test was done.

Fixes #19220
  • Loading branch information
richardpark-msft authored Oct 17, 2022
1 parent aeb29e7 commit e50c6cc
Show file tree
Hide file tree
Showing 3 changed files with 297 additions and 3 deletions.
2 changes: 1 addition & 1 deletion sdk/messaging/azeventhubs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Release History

## 0.2.0 (2022-10-13)
## 0.2.0 (2022-10-17)

### Features Added

Expand Down
294 changes: 294 additions & 0 deletions sdk/messaging/azeventhubs/consumer_client_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,294 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package azeventhubs

import (
"context"
"fmt"
"log"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/amqpwrap"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/test"
"github.com/stretchr/testify/require"
)

func TestConsumerClient_Recovery(t *testing.T) {
testParams := test.GetConnectionParamsForTest(t)

// Uncomment to see the entire recovery playbook run.
// test.EnableStdoutLogging()

dac, err := azidentity.NewDefaultAzureCredential(nil)
require.NoError(t, err)

// Overview:
// 1. Send one event per partition
// 2. Receive one event per partition. This'll ensure the links are live.
// 3. Grub into the client to get access to it's connection and shut it off.
// 4. Try again, everything should recover.
producerClient, err := NewProducerClient(testParams.EventHubNamespace, testParams.EventHubName, dac, nil)
require.NoError(t, err)

ehProps, err := producerClient.GetEventHubProperties(context.Background(), nil)
require.NoError(t, err)

// trim the partition list down so the test executes in resonable time.
ehProps.PartitionIDs = ehProps.PartitionIDs[0:3] // min for testing is 3 partitions anyways

type sendResult struct {
PartitionID string
OffsetBefore int64
}

sendResults := make([]sendResult, len(ehProps.PartitionIDs))
wg := sync.WaitGroup{}

log.Printf("1. sending 2 events to %d partitions", len(ehProps.PartitionIDs))

for i, pid := range ehProps.PartitionIDs {
wg.Add(1)

go func(i int, pid string) {
defer wg.Done()

partProps, err := producerClient.GetPartitionProperties(context.Background(), pid, nil)
require.NoError(t, err)

batch, err := producerClient.NewEventDataBatch(context.Background(), &EventDataBatchOptions{
PartitionID: &pid,
})
require.NoError(t, err)

require.NoError(t, batch.AddEventData(&EventData{
Body: []byte(fmt.Sprintf("event 1 for partition %s", pid)),
}, nil))

require.NoError(t, batch.AddEventData(&EventData{
Body: []byte(fmt.Sprintf("event 2 for partition %s", pid)),
}, nil))

err = producerClient.SendEventBatch(context.Background(), batch, nil)
require.NoError(t, err)

sendResults[i] = sendResult{PartitionID: pid, OffsetBefore: partProps.LastEnqueuedOffset}
}(i, pid)
}

wg.Wait()

test.RequireClose(t, producerClient)

// now we'll receive an event (so we know each partition client is alive)
// each partition actually has two offsets.
consumerClient, err := NewConsumerClient(testParams.EventHubNamespace, testParams.EventHubName, DefaultConsumerGroup, dac, nil)
require.NoError(t, err)

partitionClients := make([]*PartitionClient, len(sendResults))

log.Printf("2. receiving the first event for each partition")

for i, sr := range sendResults {
wg.Add(1)

go func(i int, sr sendResult) {
defer wg.Done()

partClient, err := consumerClient.NewPartitionClient(sr.PartitionID, &PartitionClientOptions{
StartPosition: StartPosition{Inclusive: false, Offset: &sr.OffsetBefore},
Prefetch: -1,
})
require.NoError(t, err)

partitionClients[i] = partClient

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

events, err := partClient.ReceiveEvents(ctx, 1, nil)
require.NoError(t, err)
require.EqualValues(t, 1, len(events))
require.Equal(t, fmt.Sprintf("event 1 for partition %s", sr.PartitionID), string(events[0].Body))
}(i, sr)
}

wg.Wait()

defer test.RequireClose(t, consumerClient)

log.Printf("3. closing connection, which will force recovery for each partition client so they can read the next event")

// now we'll close the internal connection, simulating a connection break
require.NoError(t, consumerClient.namespace.Close(context.Background(), false))

var best int64

log.Printf("4. try to read the second event, which force clients to recover")

// and try to receive the second event for each client
for i, pc := range partitionClients {
wg.Add(1)

go func(i int, pc *PartitionClient) {
defer wg.Done()

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

events, err := pc.ReceiveEvents(ctx, 1, nil)
require.NoError(t, err)
require.EqualValues(t, 1, len(events))
require.Equal(t, fmt.Sprintf("event 2 for partition %s", sendResults[i].PartitionID), string(events[0].Body))

atomic.AddInt64(&best, 1)
}(i, pc)
}

wg.Wait()
require.Equal(t, int64(len(ehProps.PartitionIDs)), best)
}

func TestConsumerClient_RecoveryLink(t *testing.T) {
testParams := test.GetConnectionParamsForTest(t)

// Uncomment to see the entire recovery playbook run.
// test.EnableStdoutLogging()

dac, err := azidentity.NewDefaultAzureCredential(nil)
require.NoError(t, err)

// Overview:
// 1. Send one event per partition
// 2. Receive one event per partition. This'll ensure the links are live.
// 3. Grub into the client to get access to it's connection and shut it off.
// 4. Try again, everything should recover.
producerClient, err := NewProducerClient(testParams.EventHubNamespace, testParams.EventHubName, dac, nil)
require.NoError(t, err)

ehProps, err := producerClient.GetEventHubProperties(context.Background(), nil)
require.NoError(t, err)

// trim the partition list down so the test executes in resonable time.
ehProps.PartitionIDs = ehProps.PartitionIDs[0:3] // min for testing is 3 partitions anyways

type sendResult struct {
PartitionID string
OffsetBefore int64
}

sendResults := make([]sendResult, len(ehProps.PartitionIDs))
wg := sync.WaitGroup{}

log.Printf("== 1. sending 2 events to %d partitions ==", len(ehProps.PartitionIDs))

for i, pid := range ehProps.PartitionIDs {
wg.Add(1)

go func(i int, pid string) {
defer wg.Done()

partProps, err := producerClient.GetPartitionProperties(context.Background(), pid, nil)
require.NoError(t, err)

batch, err := producerClient.NewEventDataBatch(context.Background(), &EventDataBatchOptions{
PartitionID: &pid,
})
require.NoError(t, err)

require.NoError(t, batch.AddEventData(&EventData{
Body: []byte(fmt.Sprintf("event 1 for partition %s", pid)),
}, nil))

require.NoError(t, batch.AddEventData(&EventData{
Body: []byte(fmt.Sprintf("event 2 for partition %s", pid)),
}, nil))

err = producerClient.SendEventBatch(context.Background(), batch, nil)
require.NoError(t, err)

sendResults[i] = sendResult{PartitionID: pid, OffsetBefore: partProps.LastEnqueuedOffset}
}(i, pid)
}

wg.Wait()

test.RequireClose(t, producerClient)

// now we'll receive an event (so we know each partition client is alive)
// each partition actually has two offsets.
consumerClient, err := NewConsumerClient(testParams.EventHubNamespace, testParams.EventHubName, DefaultConsumerGroup, dac, nil)
require.NoError(t, err)

partitionClients := make([]*PartitionClient, len(sendResults))

log.Printf("== 2. receiving the first event for each partition == ")

for i, sr := range sendResults {
wg.Add(1)

go func(i int, sr sendResult) {
defer wg.Done()

partClient, err := consumerClient.NewPartitionClient(sr.PartitionID, &PartitionClientOptions{
StartPosition: StartPosition{Inclusive: false, Offset: &sr.OffsetBefore},
Prefetch: -1,
})
require.NoError(t, err)

partitionClients[i] = partClient

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

events, err := partClient.ReceiveEvents(ctx, 1, nil)
require.NoError(t, err)
require.EqualValues(t, 1, len(events))
require.Equal(t, fmt.Sprintf("event 1 for partition %s", sr.PartitionID), string(events[0].Body))
}(i, sr)
}

wg.Wait()

defer test.RequireClose(t, consumerClient)

var best int64

log.Printf("== 3. Closing links, but leaving connection intact ==")

for i, pc := range partitionClients {
links := pc.links.(*internal.Links[amqpwrap.AMQPReceiverCloser])
lwid, err := links.GetLink(context.Background(), sendResults[i].PartitionID)
require.NoError(t, err)
require.NoError(t, lwid.Link.Close(context.Background()))
}

log.Printf("== 4. try to read the second event, which force clients to recover ==")

// and try to receive the second event for each client
for i, pc := range partitionClients {
wg.Add(1)

go func(i int, pc *PartitionClient) {
defer wg.Done()

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

events, err := pc.ReceiveEvents(ctx, 1, nil)
require.NoError(t, err)
require.EqualValues(t, 1, len(events))
require.Equal(t, fmt.Sprintf("event 2 for partition %s", sendResults[i].PartitionID), string(events[0].Body))

atomic.AddInt64(&best, 1)
}(i, pc)
}

wg.Wait()
require.Equal(t, int64(len(ehProps.PartitionIDs)), best)
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,6 @@ func BatchStressTester(ctx context.Context) error {
return err
}

defer closeOrPanic(producerClient)

// we're going to read (and re-read these events over and over in our tests)
log.Printf("Sending messages to partition %s", params.partitionID)

Expand All @@ -118,6 +116,8 @@ func BatchStressTester(ctx context.Context) error {
testData: testData,
})

closeOrPanic(producerClient)

if err != nil {
log.Fatalf("Failed to send events to partition %s: %s", params.partitionID, err)
}
Expand Down

0 comments on commit e50c6cc

Please sign in to comment.