From e50c6ccfba49d8c98ef0a10041d9540116010865 Mon Sep 17 00:00:00 2001 From: Richard Park <51494936+richardpark-msft@users.noreply.github.com> Date: Mon, 17 Oct 2022 10:07:12 -0700 Subject: [PATCH] [azeventhubs] Prep for release, fixing tests (#19364) This is just a simple update for QA, no fixes and also pushing the changelog date forward since it got delayed. - Added in some more tests for recovery. There were some incidents in the stress cluster but they turned out to be red herrings. They did reveal that some more granular tests should exist and now we have them for both link and connection level recovery. - Fixed a bug in one of the stress tests. We kept getting an odd (but non-fatal) error as the test tore down (after passing!) about the connection idling out. Turns out it was accurate - the batch test is a little unusual in that it doesn't send events continually - it sends them once at the beginning of the test and the consumers just keep rewinding to pick them up again. So the producer client just needed to be closed after it's part in the test was done. Fixes #19220 --- sdk/messaging/azeventhubs/CHANGELOG.md | 2 +- .../consumer_client_internal_test.go | 294 ++++++++++++++++++ .../eh/stress/tests/batch_stress_tester.go | 4 +- 3 files changed, 297 insertions(+), 3 deletions(-) create mode 100644 sdk/messaging/azeventhubs/consumer_client_internal_test.go diff --git a/sdk/messaging/azeventhubs/CHANGELOG.md b/sdk/messaging/azeventhubs/CHANGELOG.md index e679d6dc3254..d7ce6f7a990e 100644 --- a/sdk/messaging/azeventhubs/CHANGELOG.md +++ b/sdk/messaging/azeventhubs/CHANGELOG.md @@ -1,6 +1,6 @@ # Release History -## 0.2.0 (2022-10-13) +## 0.2.0 (2022-10-17) ### Features Added diff --git a/sdk/messaging/azeventhubs/consumer_client_internal_test.go b/sdk/messaging/azeventhubs/consumer_client_internal_test.go new file mode 100644 index 000000000000..736fbbd7317d --- /dev/null +++ b/sdk/messaging/azeventhubs/consumer_client_internal_test.go @@ -0,0 +1,294 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package azeventhubs + +import ( + "context" + "fmt" + "log" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/azidentity" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/amqpwrap" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/test" + "github.com/stretchr/testify/require" +) + +func TestConsumerClient_Recovery(t *testing.T) { + testParams := test.GetConnectionParamsForTest(t) + + // Uncomment to see the entire recovery playbook run. + // test.EnableStdoutLogging() + + dac, err := azidentity.NewDefaultAzureCredential(nil) + require.NoError(t, err) + + // Overview: + // 1. Send one event per partition + // 2. Receive one event per partition. This'll ensure the links are live. + // 3. Grub into the client to get access to it's connection and shut it off. + // 4. Try again, everything should recover. + producerClient, err := NewProducerClient(testParams.EventHubNamespace, testParams.EventHubName, dac, nil) + require.NoError(t, err) + + ehProps, err := producerClient.GetEventHubProperties(context.Background(), nil) + require.NoError(t, err) + + // trim the partition list down so the test executes in resonable time. + ehProps.PartitionIDs = ehProps.PartitionIDs[0:3] // min for testing is 3 partitions anyways + + type sendResult struct { + PartitionID string + OffsetBefore int64 + } + + sendResults := make([]sendResult, len(ehProps.PartitionIDs)) + wg := sync.WaitGroup{} + + log.Printf("1. sending 2 events to %d partitions", len(ehProps.PartitionIDs)) + + for i, pid := range ehProps.PartitionIDs { + wg.Add(1) + + go func(i int, pid string) { + defer wg.Done() + + partProps, err := producerClient.GetPartitionProperties(context.Background(), pid, nil) + require.NoError(t, err) + + batch, err := producerClient.NewEventDataBatch(context.Background(), &EventDataBatchOptions{ + PartitionID: &pid, + }) + require.NoError(t, err) + + require.NoError(t, batch.AddEventData(&EventData{ + Body: []byte(fmt.Sprintf("event 1 for partition %s", pid)), + }, nil)) + + require.NoError(t, batch.AddEventData(&EventData{ + Body: []byte(fmt.Sprintf("event 2 for partition %s", pid)), + }, nil)) + + err = producerClient.SendEventBatch(context.Background(), batch, nil) + require.NoError(t, err) + + sendResults[i] = sendResult{PartitionID: pid, OffsetBefore: partProps.LastEnqueuedOffset} + }(i, pid) + } + + wg.Wait() + + test.RequireClose(t, producerClient) + + // now we'll receive an event (so we know each partition client is alive) + // each partition actually has two offsets. + consumerClient, err := NewConsumerClient(testParams.EventHubNamespace, testParams.EventHubName, DefaultConsumerGroup, dac, nil) + require.NoError(t, err) + + partitionClients := make([]*PartitionClient, len(sendResults)) + + log.Printf("2. receiving the first event for each partition") + + for i, sr := range sendResults { + wg.Add(1) + + go func(i int, sr sendResult) { + defer wg.Done() + + partClient, err := consumerClient.NewPartitionClient(sr.PartitionID, &PartitionClientOptions{ + StartPosition: StartPosition{Inclusive: false, Offset: &sr.OffsetBefore}, + Prefetch: -1, + }) + require.NoError(t, err) + + partitionClients[i] = partClient + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + events, err := partClient.ReceiveEvents(ctx, 1, nil) + require.NoError(t, err) + require.EqualValues(t, 1, len(events)) + require.Equal(t, fmt.Sprintf("event 1 for partition %s", sr.PartitionID), string(events[0].Body)) + }(i, sr) + } + + wg.Wait() + + defer test.RequireClose(t, consumerClient) + + log.Printf("3. closing connection, which will force recovery for each partition client so they can read the next event") + + // now we'll close the internal connection, simulating a connection break + require.NoError(t, consumerClient.namespace.Close(context.Background(), false)) + + var best int64 + + log.Printf("4. try to read the second event, which force clients to recover") + + // and try to receive the second event for each client + for i, pc := range partitionClients { + wg.Add(1) + + go func(i int, pc *PartitionClient) { + defer wg.Done() + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + events, err := pc.ReceiveEvents(ctx, 1, nil) + require.NoError(t, err) + require.EqualValues(t, 1, len(events)) + require.Equal(t, fmt.Sprintf("event 2 for partition %s", sendResults[i].PartitionID), string(events[0].Body)) + + atomic.AddInt64(&best, 1) + }(i, pc) + } + + wg.Wait() + require.Equal(t, int64(len(ehProps.PartitionIDs)), best) +} + +func TestConsumerClient_RecoveryLink(t *testing.T) { + testParams := test.GetConnectionParamsForTest(t) + + // Uncomment to see the entire recovery playbook run. + // test.EnableStdoutLogging() + + dac, err := azidentity.NewDefaultAzureCredential(nil) + require.NoError(t, err) + + // Overview: + // 1. Send one event per partition + // 2. Receive one event per partition. This'll ensure the links are live. + // 3. Grub into the client to get access to it's connection and shut it off. + // 4. Try again, everything should recover. + producerClient, err := NewProducerClient(testParams.EventHubNamespace, testParams.EventHubName, dac, nil) + require.NoError(t, err) + + ehProps, err := producerClient.GetEventHubProperties(context.Background(), nil) + require.NoError(t, err) + + // trim the partition list down so the test executes in resonable time. + ehProps.PartitionIDs = ehProps.PartitionIDs[0:3] // min for testing is 3 partitions anyways + + type sendResult struct { + PartitionID string + OffsetBefore int64 + } + + sendResults := make([]sendResult, len(ehProps.PartitionIDs)) + wg := sync.WaitGroup{} + + log.Printf("== 1. sending 2 events to %d partitions ==", len(ehProps.PartitionIDs)) + + for i, pid := range ehProps.PartitionIDs { + wg.Add(1) + + go func(i int, pid string) { + defer wg.Done() + + partProps, err := producerClient.GetPartitionProperties(context.Background(), pid, nil) + require.NoError(t, err) + + batch, err := producerClient.NewEventDataBatch(context.Background(), &EventDataBatchOptions{ + PartitionID: &pid, + }) + require.NoError(t, err) + + require.NoError(t, batch.AddEventData(&EventData{ + Body: []byte(fmt.Sprintf("event 1 for partition %s", pid)), + }, nil)) + + require.NoError(t, batch.AddEventData(&EventData{ + Body: []byte(fmt.Sprintf("event 2 for partition %s", pid)), + }, nil)) + + err = producerClient.SendEventBatch(context.Background(), batch, nil) + require.NoError(t, err) + + sendResults[i] = sendResult{PartitionID: pid, OffsetBefore: partProps.LastEnqueuedOffset} + }(i, pid) + } + + wg.Wait() + + test.RequireClose(t, producerClient) + + // now we'll receive an event (so we know each partition client is alive) + // each partition actually has two offsets. + consumerClient, err := NewConsumerClient(testParams.EventHubNamespace, testParams.EventHubName, DefaultConsumerGroup, dac, nil) + require.NoError(t, err) + + partitionClients := make([]*PartitionClient, len(sendResults)) + + log.Printf("== 2. receiving the first event for each partition == ") + + for i, sr := range sendResults { + wg.Add(1) + + go func(i int, sr sendResult) { + defer wg.Done() + + partClient, err := consumerClient.NewPartitionClient(sr.PartitionID, &PartitionClientOptions{ + StartPosition: StartPosition{Inclusive: false, Offset: &sr.OffsetBefore}, + Prefetch: -1, + }) + require.NoError(t, err) + + partitionClients[i] = partClient + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + events, err := partClient.ReceiveEvents(ctx, 1, nil) + require.NoError(t, err) + require.EqualValues(t, 1, len(events)) + require.Equal(t, fmt.Sprintf("event 1 for partition %s", sr.PartitionID), string(events[0].Body)) + }(i, sr) + } + + wg.Wait() + + defer test.RequireClose(t, consumerClient) + + var best int64 + + log.Printf("== 3. Closing links, but leaving connection intact ==") + + for i, pc := range partitionClients { + links := pc.links.(*internal.Links[amqpwrap.AMQPReceiverCloser]) + lwid, err := links.GetLink(context.Background(), sendResults[i].PartitionID) + require.NoError(t, err) + require.NoError(t, lwid.Link.Close(context.Background())) + } + + log.Printf("== 4. try to read the second event, which force clients to recover ==") + + // and try to receive the second event for each client + for i, pc := range partitionClients { + wg.Add(1) + + go func(i int, pc *PartitionClient) { + defer wg.Done() + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + events, err := pc.ReceiveEvents(ctx, 1, nil) + require.NoError(t, err) + require.EqualValues(t, 1, len(events)) + require.Equal(t, fmt.Sprintf("event 2 for partition %s", sendResults[i].PartitionID), string(events[0].Body)) + + atomic.AddInt64(&best, 1) + }(i, pc) + } + + wg.Wait() + require.Equal(t, int64(len(ehProps.PartitionIDs)), best) +} diff --git a/sdk/messaging/azeventhubs/internal/eh/stress/tests/batch_stress_tester.go b/sdk/messaging/azeventhubs/internal/eh/stress/tests/batch_stress_tester.go index 47f8d0cad522..41b239f385ac 100644 --- a/sdk/messaging/azeventhubs/internal/eh/stress/tests/batch_stress_tester.go +++ b/sdk/messaging/azeventhubs/internal/eh/stress/tests/batch_stress_tester.go @@ -105,8 +105,6 @@ func BatchStressTester(ctx context.Context) error { return err } - defer closeOrPanic(producerClient) - // we're going to read (and re-read these events over and over in our tests) log.Printf("Sending messages to partition %s", params.partitionID) @@ -118,6 +116,8 @@ func BatchStressTester(ctx context.Context) error { testData: testData, }) + closeOrPanic(producerClient) + if err != nil { log.Fatalf("Failed to send events to partition %s: %s", params.partitionID, err) }