diff --git a/sdk/messaging/azeventhubs/CHANGELOG.md b/sdk/messaging/azeventhubs/CHANGELOG.md index 901a89e01b25..cb0b0b33ed38 100644 --- a/sdk/messaging/azeventhubs/CHANGELOG.md +++ b/sdk/messaging/azeventhubs/CHANGELOG.md @@ -1,14 +1,10 @@ # Release History -## 1.0.3 (Unreleased) - -### Features Added - -### Breaking Changes +## 1.0.3 (2024-01-16) ### Bugs Fixed -### Other Changes +- Processor distributes partitions optimally, which would result in idle or over-assigned processors. (PR#22153) ## 1.0.2 (2023-11-07) diff --git a/sdk/messaging/azeventhubs/inmemory_checkpoint_store_test.go b/sdk/messaging/azeventhubs/inmemory_checkpoint_store_test.go index e0b0f50fa65c..55c3650d4869 100644 --- a/sdk/messaging/azeventhubs/inmemory_checkpoint_store_test.go +++ b/sdk/messaging/azeventhubs/inmemory_checkpoint_store_test.go @@ -4,6 +4,7 @@ package azeventhubs import ( "context" + "sort" "strings" "sync" "testing" @@ -277,6 +278,10 @@ func (cps *testCheckpointStore) ListOwnership(ctx context.Context, fullyQualifie ownerships = append(ownerships, v) } + sort.Slice(ownerships, func(i, j int) bool { + return ownerships[i].PartitionID < ownerships[j].PartitionID + }) + return ownerships, nil } diff --git a/sdk/messaging/azeventhubs/internal/eh/stress/scenarios-matrix.yaml b/sdk/messaging/azeventhubs/internal/eh/stress/scenarios-matrix.yaml index 84ea05ef7951..8d83d95134e9 100644 --- a/sdk/messaging/azeventhubs/internal/eh/stress/scenarios-matrix.yaml +++ b/sdk/messaging/azeventhubs/internal/eh/stress/scenarios-matrix.yaml @@ -61,3 +61,7 @@ matrix: prefetch: 0 verbose: "" sleepAfter: "5m" + multibalance: + testTarget: multibalance + rounds: 20 + verbose: "" diff --git a/sdk/messaging/azeventhubs/internal/eh/stress/stress-test-resources.json b/sdk/messaging/azeventhubs/internal/eh/stress/stress-test-resources.json index 8b5482ed8cad..5880de174968 100644 --- a/sdk/messaging/azeventhubs/internal/eh/stress/stress-test-resources.json +++ b/sdk/messaging/azeventhubs/internal/eh/stress/stress-test-resources.json @@ -52,9 +52,15 @@ "apiVersion": "[variables('apiVersion')]", "name": "[variables('authorizationName')]", "location": "[variables('location')]", - "dependsOn": ["[resourceId('Microsoft.EventHub/namespaces', variables('namespaceName'))]"], + "dependsOn": [ + "[resourceId('Microsoft.EventHub/namespaces', variables('namespaceName'))]" + ], "properties": { - "rights": ["Listen", "Manage", "Send"] + "rights": [ + "Listen", + "Manage", + "Send" + ] } }, { @@ -62,7 +68,9 @@ "apiVersion": "[variables('apiVersion')]", "name": "[variables('eventHubNameFull')]", "location": "[variables('location')]", - "dependsOn": ["[resourceId('Microsoft.EventHub/namespaces', variables('namespaceName'))]"], + "dependsOn": [ + "[resourceId('Microsoft.EventHub/namespaces', variables('namespaceName'))]" + ], "properties": { "messageRetentionInDays": 7, "partitionCount": 32 @@ -73,7 +81,9 @@ "apiVersion": "[variables('apiVersion')]", "name": "[concat(variables('namespaceName'), '/default')]", "location": "[variables('location')]", - "dependsOn": ["[resourceId('Microsoft.EventHub/namespaces', variables('namespaceName'))]"], + "dependsOn": [ + "[resourceId('Microsoft.EventHub/namespaces', variables('namespaceName'))]" + ], "properties": { "defaultAction": "Deny", "virtualNetworkRules": [], @@ -127,13 +137,15 @@ "name": "[concat('default/', variables('containerName'))]", "type": "blobServices/containers", "apiVersion": "[variables('storageApiVersion')]", - "dependsOn": ["[variables('storageAccountName')]"] + "dependsOn": [ + "[variables('storageAccountName')]" + ] } ] }, ], "outputs": { - "EVENTHUB_NAME": { + "EVENTHUB_NAME_STRESS": { "type": "string", "value": "[variables('eventHubName')]" }, diff --git a/sdk/messaging/azeventhubs/internal/eh/stress/stress.go b/sdk/messaging/azeventhubs/internal/eh/stress/stress.go index 81ab35a93964..54ea103925ab 100644 --- a/sdk/messaging/azeventhubs/internal/eh/stress/stress.go +++ b/sdk/messaging/azeventhubs/internal/eh/stress/stress.go @@ -17,6 +17,8 @@ func main() { fn func(ctx context.Context) error }{ {name: "batch", fn: tests.BatchStressTester}, + {name: "balance", fn: tests.BalanceTester}, + {name: "multibalance", fn: tests.MultiBalanceTester}, {name: "processor", fn: tests.ProcessorStressTester}, } diff --git a/sdk/messaging/azeventhubs/internal/eh/stress/templates/stress-test-job.yaml b/sdk/messaging/azeventhubs/internal/eh/stress/templates/stress-test-job.yaml index 4a6ff57cc6a0..305d0a512d05 100644 --- a/sdk/messaging/azeventhubs/internal/eh/stress/templates/stress-test-job.yaml +++ b/sdk/messaging/azeventhubs/internal/eh/stress/templates/stress-test-job.yaml @@ -22,7 +22,11 @@ spec: - > set -ex; mkdir -p "$DEBUG_SHARE"; + {{if eq .Stress.testTarget "multibalance" }} + /app/stress "{{.Stress.testTarget}}" "-rounds" "{{.Stress.rounds}}" "{{.Stress.verbose}}" 2>&1 | tee -a "${DEBUG_SHARE}/{{ .Stress.Scenario }}-`date +%s`.log"; + {{else}} /app/stress "{{.Stress.testTarget}}" "-rounds" "{{.Stress.rounds}}" "-prefetch" "{{.Stress.prefetch}}" "{{.Stress.verbose}}" "-sleepAfter" "{{.Stress.sleepAfter}}" 2>&1 | tee -a "${DEBUG_SHARE}/{{ .Stress.Scenario }}-`date +%s`.log"; + {{end}} # Pulls the image on pod start, always. We tend to push to the same image and tag over and over again # when iterating, so this is a must. imagePullPolicy: Always diff --git a/sdk/messaging/azeventhubs/internal/eh/stress/tests/balance_tester.go b/sdk/messaging/azeventhubs/internal/eh/stress/tests/balance_tester.go new file mode 100644 index 000000000000..0cfe64362c72 --- /dev/null +++ b/sdk/messaging/azeventhubs/internal/eh/stress/tests/balance_tester.go @@ -0,0 +1,401 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package tests + +import ( + "context" + "encoding/json" + "errors" + "flag" + "fmt" + golog "log" + "os" + "strings" + "sync" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/internal/log" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror" +) + +const ( + EventBalanceTest log.Event = "balance.test" +) + +// BalanceTester checks that we can properly distribute partitions and +// maintain it over time. +func BalanceTester(ctx context.Context) error { + fs := flag.NewFlagSet("", flag.ContinueOnError) + + numProcessors := fs.Int("processors", 32, "The # of processor instances to run") + strategy := fs.String("strategy", string(azeventhubs.ProcessorStrategyBalanced), "The partition acquisition strategy to use (balanced, greedy)") + + if err := fs.Parse(os.Args[2:]); err != nil { + return err + } + + log.SetEvents(EventBalanceTest, azeventhubs.EventConsumer) + log.SetListener(func(e log.Event, s string) { + // we don't have structured logging in our SDK so this is the most reasonable way to + // see what partitions each processor + if e == azeventhubs.EventConsumer && + !strings.Contains(s, "Asked for") { + return + } + + golog.Printf("[%s] %s", e, s) + }) + + return balanceTesterImpl(ctx, *numProcessors, azeventhubs.ProcessorStrategy(*strategy)) +} + +func balanceTesterImpl(ctx context.Context, numProcessors int, strategy azeventhubs.ProcessorStrategy) error { + testData, err := newStressTestData("balancetester", map[string]string{ + "processors": fmt.Sprintf("%d", numProcessors), + "strategy": string(strategy), + }) + + if err != nil { + return err + } + + args := balanceTester{ + stressTestData: testData, + numProcessors: numProcessors, + strategy: strategy, + } + + args.numPartitions, err = func(ctx context.Context) (int, error) { + client, err := azeventhubs.NewProducerClientFromConnectionString(args.ConnectionString, args.HubName, nil) + + if err != nil { + return 0, err + } + + defer func() { + _ = client.Close(ctx) + }() + + props, err := client.GetEventHubProperties(ctx, nil) + + if err != nil { + return 0, err + } + + return len(props.PartitionIDs), nil + }(ctx) + + if err != nil { + return err + } + + return args.Run(ctx) +} + +type balanceTester struct { + *stressTestData + + strategy azeventhubs.ProcessorStrategy + numProcessors int + numPartitions int +} + +func (bt *balanceTester) Run(ctx context.Context) error { + defer bt.cleanupContainer() + + wg := sync.WaitGroup{} + failuresChan := make(chan error, bt.numProcessors) + + testCtx, cancelTest := context.WithTimeout(context.Background(), 10*time.Minute) + defer cancelTest() + + mu := sync.Mutex{} + var lastBalanceError error + startTime := time.Now() + + go func() { + balancedCount := 0 + var firstBalance time.Duration + + Loop: + // poll every 5 seconds to see if the checkpoint store is "balanced" (all owners + // own a fair-share of the partitions). + for { + select { + case <-ctx.Done(): + break Loop + case <-time.After(5 * time.Second): + err := bt.checkBalance(ctx) + + if ibErr := (unbalancedError)(nil); errors.As(err, &ibErr) { + mu.Lock() + lastBalanceError = err + mu.Unlock() + + log.Writef(EventBalanceTest, "Balance not achieved, resetting balancedCount: %s", ibErr) + balancedCount = 0 + + bt.TC.TrackEvent("Unbalanced", map[string]string{ + "Message": ibErr.Error(), + }) + continue + } else if err != nil { + mu.Lock() + lastBalanceError = err + mu.Unlock() + + bt.TC.TrackException(err) + break Loop + } + + if balancedCount == 0 { + firstBalance = time.Since(startTime) + } + + balancedCount++ + log.Writef(EventBalanceTest, "Balanced, with %d consecutive checks", balancedCount) + + bt.TC.TrackEvent("Balanced", map[string]string{ + "Count": fmt.Sprintf("%d", balancedCount), + "DurationSeconds": fmt.Sprintf("%d", firstBalance/time.Second), + }) + + if balancedCount == 3 { + log.Writef(EventBalanceTest, "Balanced at %d seconds (approx)", firstBalance/time.Second) + + mu.Lock() + lastBalanceError = nil + mu.Unlock() + + cancelTest() + break Loop + } + } + } + }() + + for i := 0; i < bt.numProcessors; i++ { + wg.Add(1) + + go func(i int) { + defer wg.Done() + + if err := bt.process(testCtx, fmt.Sprintf("proc%02d", i)); err != nil { + failuresChan <- err + cancelTest() + return + } + }(i) + } + + wg.Wait() + close(failuresChan) + cancelTest() + + // any errors? + for err := range failuresChan { + bt.TC.TrackException(err) + fmt.Printf("ERROR: %s\n", err) + return err + } + + mu.Lock() + err := lastBalanceError + mu.Unlock() + + if err != nil { + bt.TC.TrackException(err) + return err + } + + log.Writef(EventBalanceTest, "BALANCED") + return nil +} + +func (bt *balanceTester) process(ctx context.Context, name string) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + client, err := azeventhubs.NewConsumerClientFromConnectionString(bt.ConnectionString, bt.HubName, azeventhubs.DefaultConsumerGroup, nil) + + if err != nil { + return err + } + + defer func() { _ = client.Close(ctx) }() + + blobClient, err := azblob.NewClientFromConnectionString(bt.StorageConnectionString, nil) + + if err != nil { + return err + } + + containerClient := blobClient.ServiceClient().NewContainerClient(bt.runID) + + if _, err := containerClient.Create(ctx, nil); err != nil { + if !bloberror.HasCode(err, bloberror.ContainerAlreadyExists) { + return err + } + } + + blobStore, err := checkpoints.NewBlobStore(containerClient, nil) + + if err != nil { + return err + } + + processor, err := azeventhubs.NewProcessor(client, blobStore, &azeventhubs.ProcessorOptions{ + LoadBalancingStrategy: bt.strategy, + }) + + if err != nil { + return err + } + + ch := make(chan struct{}) + go func() { + defer close(ch) + for { + pc := processor.NextPartitionClient(ctx) + + if pc == nil { + break + } + + go bt.keepAlive(ctx, pc) + } + }() + + err = processor.Run(ctx) + cancel() + <-ch + + return err +} + +func (bt *balanceTester) keepAlive(ctx context.Context, pc *azeventhubs.ProcessorPartitionClient) { + defer func() { + _ = pc.Close(context.Background()) + }() + + for { + if _, err := pc.ReceiveEvents(ctx, 1, nil); err != nil { + break + } + } +} + +type unbalancedError error + +// checkBalance queries the checkpoint store. +// It returns `nil` if no error occurred and the checkpoint store was balanced. +// If the checkpoint store is NOT balanced it returns an unbalancedError +func (bt *balanceTester) checkBalance(ctx context.Context) error { + blobClient, err := azblob.NewClientFromConnectionString(bt.StorageConnectionString, nil) + + if err != nil { + return err + } + + blobStore, err := checkpoints.NewBlobStore( + blobClient.ServiceClient().NewContainerClient(bt.runID), + nil) + + if err != nil { + return err + } + + ownerships, err := blobStore.ListOwnership(ctx, bt.Namespace, bt.HubName, azeventhubs.DefaultConsumerGroup, nil) + + if err != nil { + return err + } + + stats := bt.summarizeBalance(ownerships) + + if !stats.Balanced { + return unbalancedError(fmt.Errorf("unbalanced: %s", stats.String())) + } + + return nil +} + +func (bt *balanceTester) cleanupContainer() { + blobClient, err := azblob.NewClientFromConnectionString(bt.StorageConnectionString, nil) + + if err != nil { + return + } + + containerClient := blobClient.ServiceClient().NewContainerClient(bt.runID) + + _, _ = containerClient.Delete(context.Background(), nil) +} + +func (bt *balanceTester) summarizeBalance(ownerships []azeventhubs.Ownership) stats { + counts := map[string]int{} + + for _, o := range ownerships { + counts[o.OwnerID]++ + } + + // now let's make sure everyone only took a fair share + min := bt.numPartitions / bt.numProcessors + max := min + + if bt.numPartitions%bt.numProcessors != 0 { + max += 1 + } + + tooFew := 0 + tooMany := 0 + + for _, owned := range counts { + if owned < min { + tooFew++ + } else if owned > max { + tooMany++ + } + } + + sum := 0 + + for _, v := range counts { + sum += v + } + + return stats{ + Processors: fmt.Sprintf("%d/%d", len(counts), bt.numProcessors), + Partitions: fmt.Sprintf("%d/%d", sum, bt.numPartitions), + OwnTooFew: tooFew, + OwnTooMany: tooMany, + Balanced: len(counts) == bt.numProcessors && + sum == bt.numPartitions && + tooFew == 0 && + tooMany == 0, + Raw: counts, + } +} + +type stats struct { + Processors string + Partitions string + OwnTooFew int + OwnTooMany int + Balanced bool + Raw map[string]int +} + +func (s *stats) String() string { + jsonBytes, err := json.Marshal(s) + + if err != nil { + panic(err) + } + + return string(jsonBytes) +} diff --git a/sdk/messaging/azeventhubs/internal/eh/stress/tests/batch_stress_tester.go b/sdk/messaging/azeventhubs/internal/eh/stress/tests/batch_stress_tester.go index 3b23c3a9815f..8b974a05bb9d 100644 --- a/sdk/messaging/azeventhubs/internal/eh/stress/tests/batch_stress_tester.go +++ b/sdk/messaging/azeventhubs/internal/eh/stress/tests/batch_stress_tester.go @@ -47,7 +47,8 @@ func getBatchTesterParams(args []string) (batchTesterParams, error) { fs.IntVar(¶ms.paddingBytes, "padding", 1024, "Extra number of bytes to add into each message body") fs.StringVar(¶ms.partitionID, "partition", "0", "Partition ID to send and receive events to") fs.IntVar(¶ms.maxDeadlineExceeded, "maxtimeouts", 10, "Number of consecutive receive timeouts allowed before quitting") - fs.BoolVar(¶ms.enableVerboseLogging, "verbose", false, "enable verbose azure sdk logging") + enableVerboseLoggingFn := addVerboseLoggingFlag(fs, nil) + sleepAfterFn := addSleepAfterFlag(fs) if err := fs.Parse(os.Args[2:]); err != nil { @@ -55,6 +56,7 @@ func getBatchTesterParams(args []string) (batchTesterParams, error) { return batchTesterParams{}, err } + enableVerboseLoggingFn() params.prefetch = int32(*prefetch) if params.rounds == -1 { @@ -85,7 +87,7 @@ func BatchStressTester(ctx context.Context) error { defer params.sleepAfterFn() - testData, err := newStressTestData("batch", params.enableVerboseLogging, map[string]string{ + testData, err := newStressTestData("batch", map[string]string{ "BatchDuration": params.batchDuration.String(), "BatchSize": fmt.Sprintf("%d", params.batchSize), "NumToSend": fmt.Sprintf("%d", params.numToSend), @@ -93,7 +95,6 @@ func BatchStressTester(ctx context.Context) error { "PartitionId": params.partitionID, "Prefetch": fmt.Sprintf("%d", params.prefetch), "Rounds": fmt.Sprintf("%d", params.rounds), - "Verbose": fmt.Sprintf("%t", params.enableVerboseLogging), "MaxDeadlineExceeded": fmt.Sprintf("%d", params.maxDeadlineExceeded), }) @@ -155,16 +156,15 @@ func BatchStressTester(ctx context.Context) error { } type batchTesterParams struct { - numToSend int - paddingBytes int - partitionID string - batchSize int - batchDuration time.Duration - rounds int64 - prefetch int32 - maxDeadlineExceeded int - enableVerboseLogging bool - sleepAfterFn func() + numToSend int + paddingBytes int + partitionID string + batchSize int + batchDuration time.Duration + rounds int64 + prefetch int32 + maxDeadlineExceeded int + sleepAfterFn func() } func consumeForBatchTester(ctx context.Context, round int64, cc *azeventhubs.ConsumerClient, sp azeventhubs.StartPosition, params batchTesterParams, testData *stressTestData) error { diff --git a/sdk/messaging/azeventhubs/internal/eh/stress/tests/multi_balance_tester.go b/sdk/messaging/azeventhubs/internal/eh/stress/tests/multi_balance_tester.go new file mode 100644 index 000000000000..d2fd99f5c316 --- /dev/null +++ b/sdk/messaging/azeventhubs/internal/eh/stress/tests/multi_balance_tester.go @@ -0,0 +1,85 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package tests + +import ( + "context" + "flag" + "fmt" + golog "log" + "os" + "strings" + + "github.com/Azure/azure-sdk-for-go/sdk/internal/log" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" +) + +// MultiBalanceTester runs the BalanceTest multiple times against different +// combinations of partition acquisition strategy and number of processors. +// +// NOTE: this test assumes that the Event Hub you're using has 32 partitions. +func MultiBalanceTester(ctx context.Context) error { + fs := flag.NewFlagSet("", flag.ContinueOnError) + + rounds := fs.Int("rounds", 1, "Number of rounds to run") + + if err := fs.Parse(os.Args[2:]); err != nil { + return err + } + + ch := make(chan string, 10000) + + log.SetEvents(EventBalanceTest, azeventhubs.EventConsumer) + log.SetListener(func(e log.Event, s string) { + if e == azeventhubs.EventConsumer && + !strings.Contains(s, "Asked for") { + return + } + + ch <- fmt.Sprintf("[%s] %s", e, s) + }) + + go func() { + for { + select { + case s := <-ch: + golog.Println(s) + case <-ctx.Done(): + break + } + } + }() + + for i := 0; i < *rounds; i++ { + testData := []struct { + Processors int + Strategy azeventhubs.ProcessorStrategy + }{ + {32, azeventhubs.ProcessorStrategyGreedy}, + {31, azeventhubs.ProcessorStrategyGreedy}, + {16, azeventhubs.ProcessorStrategyGreedy}, + {5, azeventhubs.ProcessorStrategyGreedy}, + {1, azeventhubs.ProcessorStrategyGreedy}, + + {32, azeventhubs.ProcessorStrategyBalanced}, + {31, azeventhubs.ProcessorStrategyBalanced}, + {16, azeventhubs.ProcessorStrategyBalanced}, + {5, azeventhubs.ProcessorStrategyBalanced}, + {1, azeventhubs.ProcessorStrategyBalanced}, + } + + for _, td := range testData { + log.Writef(EventBalanceTest, "----- BEGIN[%d]: %s, %d processors -----", i, td.Strategy, td.Processors) + + if err := balanceTesterImpl(ctx, td.Processors, td.Strategy); err != nil { + log.Writef(EventBalanceTest, "----- END[%d]: FAIL: %s, %d processors, %s -----", i, td.Strategy, td.Processors, err) + return err + } + + log.Writef(EventBalanceTest, "----- END[%d]: %s, %d processors -----", i, td.Strategy, td.Processors) + } + } + + return nil +} diff --git a/sdk/messaging/azeventhubs/internal/eh/stress/tests/processor_stress_tester.go b/sdk/messaging/azeventhubs/internal/eh/stress/tests/processor_stress_tester.go index 9e9a48032059..13485da36fb7 100644 --- a/sdk/messaging/azeventhubs/internal/eh/stress/tests/processor_stress_tester.go +++ b/sdk/messaging/azeventhubs/internal/eh/stress/tests/processor_stress_tester.go @@ -53,7 +53,7 @@ func newProcessorStressTest(args []string) (*processorStressTest, error) { eventsPerRound := fs.Int("send", 5000, "Number of events to send per round") rounds := fs.Int64("rounds", 100, "Number of rounds. -1 means math.MaxInt64") prefetch := fs.Int("prefetch", 0, "Number of events to set for the prefetch. Negative numbers disable prefetch altogether. 0 uses the default for the package.") - enableVerboseLogging := fs.Bool("verbose", false, "enable verbose azure sdk logging") + enableVerboseLoggingFn := addVerboseLoggingFlag(fs, nil) sleepAfterFn := addSleepAfterFlag(fs) if err := fs.Parse(args); err != nil { @@ -61,11 +61,13 @@ func newProcessorStressTest(args []string) (*processorStressTest, error) { return nil, err } + enableVerboseLoggingFn() + if *rounds == -1 { *rounds = math.MaxInt64 } - testData, err := newStressTestData("infiniteprocessor", *enableVerboseLogging, map[string]string{ + testData, err := newStressTestData("infiniteprocessor", map[string]string{ "Processors": fmt.Sprintf("%d", numProcessors), "EventsPerRound": fmt.Sprintf("%d", eventsPerRound), "Rounds": fmt.Sprintf("%d", rounds), diff --git a/sdk/messaging/azeventhubs/internal/eh/stress/tests/shared.go b/sdk/messaging/azeventhubs/internal/eh/stress/tests/shared.go index 5e5e210c9245..b5ee828ec37d 100644 --- a/sdk/messaging/azeventhubs/internal/eh/stress/tests/shared.go +++ b/sdk/messaging/azeventhubs/internal/eh/stress/tests/shared.go @@ -60,7 +60,7 @@ func (td *stressTestData) Close() { type logf func(format string, v ...any) -func newStressTestData(name string, verbose bool, baggage map[string]string) (*stressTestData, error) { +func newStressTestData(name string, baggage map[string]string) (*stressTestData, error) { td := &stressTestData{ name: name, runID: fmt.Sprintf("%s-%d", name, time.Now().UnixNano()), @@ -80,7 +80,7 @@ func newStressTestData(name string, verbose bool, baggage map[string]string) (*s variables := map[string]*string{ "EVENTHUB_CONNECTION_STRING": &td.ConnectionString, - "EVENTHUB_NAME": &td.HubName, + "EVENTHUB_NAME_STRESS": &td.HubName, "CHECKPOINTSTORE_STORAGE_CONNECTION_STRING": &td.StorageConnectionString, } @@ -98,10 +98,6 @@ func newStressTestData(name string, verbose bool, baggage map[string]string) (*s return nil, fmt.Errorf("missing environment variables (%s)", strings.Join(missing, ",")) } - if verbose { - enableVerboseLogging() - } - tc, err := loadAppInsights() if err != nil { @@ -377,13 +373,6 @@ func closeOrPanic(closeable interface { } } -func enableVerboseLogging() { - //azlog.SetEvents(azeventhubs.EventAuth, azeventhubs.EventConn, azeventhubs.EventConsumer) - azlog.SetListener(func(e azlog.Event, s string) { - log.Printf("[%s] %s", e, s) - }) -} - func addSleepAfterFlag(fs *flag.FlagSet) func() { var durationStr string fs.StringVar(&durationStr, "sleepAfter", "0m", "Time to sleep after test completes") @@ -403,3 +392,33 @@ func addSleepAfterFlag(fs *flag.FlagSet) func() { } } } + +func addVerboseLoggingFlag(fs *flag.FlagSet, customLogFn func(verbose string, e azlog.Event, s string)) func() { + verbose := fs.String("v", "", "Enable verbose SDK logging. Valid values are test or sdk or all") + + logFn := func(e azlog.Event, s string) { + log.Printf("[%s] %s", e, s) + } + + if customLogFn != nil { + logFn = func(e azlog.Event, s string) { + customLogFn(*verbose, e, s) + } + } + + return func() { + switch *verbose { + case "": + case "test": + azlog.SetEvents(EventBalanceTest) + azlog.SetListener(logFn) + case "sdk": + azlog.SetEvents(EventBalanceTest, azeventhubs.EventConsumer, azeventhubs.EventProducer) + azlog.SetListener(logFn) + case "all": + azlog.SetListener(logFn) + default: + fmt.Printf("%s is not a valid logging value. Valid values are test or sdk or all", *verbose) + } + } +} diff --git a/sdk/messaging/azeventhubs/processor_load_balancer.go b/sdk/messaging/azeventhubs/processor_load_balancer.go index 7ab002a421f5..62ec59a88e49 100644 --- a/sdk/messaging/azeventhubs/processor_load_balancer.go +++ b/sdk/messaging/azeventhubs/processor_load_balancer.go @@ -46,16 +46,17 @@ type loadBalancerInfo struct { // it contains _all_ the partitions for that particular consumer. aboveMax []Ownership - // maxAllowed is the maximum number of partitions a consumer should have - // If partitions do not divide evenly this will be the "theoretical" max - // with the assumption that this particular consumer will get an extra - // partition. + // claimMorePartitions is true when we should try to claim more partitions + // because we're under the limit, or we're in a situation where we could claim + // one extra partition. + claimMorePartitions bool + + // maxAllowed is the maximum number of partitions that other processors are allowed + // to own during this round. It can change based on how many partitions we own and whether + // an 'extra' partition is allowed (ie, partitions %owners is not 0). Look at + // [processorLoadBalancer.getAvailablePartitions] for more details. maxAllowed int - // extraPartitionPossible is true if the partitions cannot split up evenly - // amongst all the known consumers. - extraPartitionPossible bool - raw []Ownership } @@ -68,45 +69,22 @@ func (lb *processorLoadBalancer) LoadBalance(ctx context.Context, partitionIDs [ return nil, err } - claimMorePartitions := true - - if len(lbinfo.current) >= lbinfo.maxAllowed { - // - I have _exactly_ the right amount - // or - // - I have too many. We expect to have some stolen from us, but we'll maintain - // ownership for now. - claimMorePartitions = false - log.Writef(EventConsumer, "Owns %d/%d, no more needed", len(lbinfo.current), lbinfo.maxAllowed) - } else if lbinfo.extraPartitionPossible && len(lbinfo.current) == lbinfo.maxAllowed-1 { - // In the 'extraPartitionPossible' scenario, some consumers will have an extra partition - // since things don't divide up evenly. We're one under the max, which means we _might_ - // be able to claim another one. - // - // We will attempt to grab _one_ more but only if there are free partitions available - // or if one of the consumers has more than the max allowed. - claimMorePartitions = len(lbinfo.unownedOrExpired) > 0 || len(lbinfo.aboveMax) > 0 - log.Writef(EventConsumer, "Unowned/expired: %d, above max: %d, need to claim: %t", - len(lbinfo.unownedOrExpired), - len(lbinfo.aboveMax), - claimMorePartitions) - } - ownerships := lbinfo.current - if claimMorePartitions { + if lbinfo.claimMorePartitions { switch lb.strategy { case ProcessorStrategyGreedy: - log.Writef(EventConsumer, "Using greedy strategy to claim partitions") + log.Writef(EventConsumer, "[%s] Using greedy strategy to claim partitions", lb.details.ClientID) ownerships = lb.greedyLoadBalancer(ctx, lbinfo) case ProcessorStrategyBalanced: - log.Writef(EventConsumer, "Using balanced strategy to claim partitions") + log.Writef(EventConsumer, "[%s] Using balanced strategy to claim partitions", lb.details.ClientID) o := lb.balancedLoadBalancer(ctx, lbinfo) if o != nil { ownerships = append(lbinfo.current, *o) } default: - return nil, fmt.Errorf("invalid load balancing strategy '%s'", lb.strategy) + return nil, fmt.Errorf("[%s] invalid load balancing strategy '%s'", lb.details.ClientID, lb.strategy) } } @@ -133,8 +111,11 @@ func partitionsForOwnerships(all []Ownership) string { return strings.Join(parts, ",") } -// getAvailablePartitions finds all partitions that are either completely unowned _or_ -// their ownership is stale. +// getAvailablePartitions looks through the ownership list (using the checkpointstore.ListOwnership) and evaluates: +// - Whether we should claim more partitions +// - Which partitions are available - unowned/relinquished, expired or processors that own more than the maximum allowed. +// +// Load balancing happens in individual functions func (lb *processorLoadBalancer) getAvailablePartitions(ctx context.Context, partitionIDs []string) (loadBalancerInfo, error) { log.Writef(EventConsumer, "[%s] Listing ownership for %s/%s/%s", lb.details.ClientID, lb.details.FullyQualifiedNamespace, lb.details.EventHubName, lb.details.ConsumerGroup) @@ -170,7 +151,6 @@ func (lb *processorLoadBalancer) getAvailablePartitions(ctx context.Context, par } numExpired := len(unownedOrExpired) - log.Writef(EventConsumer, "Expired: %d", numExpired) // add in all the unowned partitions for _, partID := range partitionIDs { @@ -189,12 +169,13 @@ func (lb *processorLoadBalancer) getAvailablePartitions(ctx context.Context, par }) } - log.Writef(EventConsumer, "Unowned: %d", len(unownedOrExpired)-numExpired) + minRequired := len(partitionIDs) / len(groupedByOwner) + maxAllowed := minRequired + allowExtraPartition := len(partitionIDs)%len(groupedByOwner) > 0 - maxAllowed := len(partitionIDs) / len(groupedByOwner) - hasRemainder := len(partitionIDs)%len(groupedByOwner) > 0 - - if hasRemainder { + // only allow owners to keep extra partitions if we've already met our minimum bar. Otherwise + // above the minimum is fair game. + if allowExtraPartition && len(groupedByOwner[lb.details.ClientID]) >= minRequired { maxAllowed += 1 } @@ -210,13 +191,41 @@ func (lb *processorLoadBalancer) getAvailablePartitions(ctx context.Context, par } } + claimMorePartitions := true + current := groupedByOwner[lb.details.ClientID] + + if len(current) >= maxAllowed { + // - I have _exactly_ the right amount + // or + // - I have too many. We expect to have some stolen from us, but we'll maintain + // ownership for now. + claimMorePartitions = false + } else if allowExtraPartition && len(current) == maxAllowed-1 { + // In the 'allowExtraPartition' scenario, some consumers will have an extra partition + // since things don't divide up evenly. We're one under the max, which means we _might_ + // be able to claim another one. + // + // We will attempt to grab _one_ more but only if there are free partitions available + // or if one of the consumers has more than the max allowed. + claimMorePartitions = len(unownedOrExpired) > 0 || len(aboveMax) > 0 + } + + log.Writef(EventConsumer, "[%s] claimMorePartitions: %t, owners: %d, current: %d, unowned: %d, expired: %d, above: %d", + lb.details.ClientID, + claimMorePartitions, + len(groupedByOwner), + len(current), + len(unownedOrExpired)-numExpired, + numExpired, + len(aboveMax)) + return loadBalancerInfo{ - current: groupedByOwner[lb.details.ClientID], - unownedOrExpired: unownedOrExpired, - aboveMax: aboveMax, - maxAllowed: maxAllowed, - extraPartitionPossible: hasRemainder, - raw: ownerships, + current: current, + unownedOrExpired: unownedOrExpired, + aboveMax: aboveMax, + claimMorePartitions: claimMorePartitions, + raw: ownerships, + maxAllowed: maxAllowed, }, nil } diff --git a/sdk/messaging/azeventhubs/processor_load_balancers_test.go b/sdk/messaging/azeventhubs/processor_load_balancers_test.go index 40df95fc14d4..c6c3611797b0 100644 --- a/sdk/messaging/azeventhubs/processor_load_balancers_test.go +++ b/sdk/messaging/azeventhubs/processor_load_balancers_test.go @@ -4,7 +4,10 @@ package azeventhubs import ( "context" + "fmt" + "math/rand" "sort" + "strconv" "testing" "time" @@ -283,15 +286,15 @@ func TestProcessorLoadBalancers_AnyStrategy_StealsToBalance(t *testing.T) { func TestProcessorLoadBalancers_InvalidStrategy(t *testing.T) { cps := newCheckpointStoreForTest() - lb := newProcessorLoadBalancer(cps, newTestConsumerDetails("does not matter"), "", time.Hour) + lb := newProcessorLoadBalancer(cps, newTestConsumerDetails("clientid"), "", time.Hour) ownerships, err := lb.LoadBalance(context.Background(), []string{"0"}) require.Nil(t, ownerships) - require.EqualError(t, err, "invalid load balancing strategy ''") + require.EqualError(t, err, "[clientid] invalid load balancing strategy ''") - lb = newProcessorLoadBalancer(cps, newTestConsumerDetails("does not matter"), "super-greedy", time.Hour) + lb = newProcessorLoadBalancer(cps, newTestConsumerDetails("clientid"), "super-greedy", time.Hour) ownerships, err = lb.LoadBalance(context.Background(), []string{"0"}) require.Nil(t, ownerships) - require.EqualError(t, err, "invalid load balancing strategy 'super-greedy'") + require.EqualError(t, err, "[clientid] invalid load balancing strategy 'super-greedy'") } func TestProcessorLoadBalancers_AnyStrategy_GrabRelinquishedPartition(t *testing.T) { @@ -328,6 +331,86 @@ func TestProcessorLoadBalancers_AnyStrategy_GrabRelinquishedPartition(t *testing } } +func TestUnit_ProcessorLoadBalancer_Balanced(t *testing.T) { + for _, td := range []string{"abc", "abbc", "abbcc"} { + for _, ownerID := range []string{"a", "b", "c"} { + t.Run(fmt.Sprintf("Layout %q, with owner %q", td, ownerID), func(t *testing.T) { + lb, parts := createLoadBalancerForTest(t, td, ownerID) + lbi, err := lb.getAvailablePartitions(context.Background(), parts) + require.NoError(t, err) + require.False(t, lbi.claimMorePartitions) + require.Empty(t, lbi.aboveMax) + }) + } + } + + t.Run("balanced with unequal ownership", func(t *testing.T) { + lb, parts := createLoadBalancerForTest(t, "aaaabbb", "a") + lbi, err := lb.getAvailablePartitions(context.Background(), parts) + require.NoError(t, err) + require.False(t, lbi.claimMorePartitions) + require.Equal(t, 4, lbi.maxAllowed) + require.Empty(t, lbi.aboveMax) + + lb, parts = createLoadBalancerForTest(t, "aaaabbb", "b") + lbi, err = lb.getAvailablePartitions(context.Background(), parts) + require.NoError(t, err) + require.False(t, lbi.claimMorePartitions) + require.Equal(t, 4, lbi.maxAllowed) + require.Empty(t, lbi.aboveMax) + }) +} + +func TestUnit_ProcessorLoadBalancer_Unbalanced(t *testing.T) { + t.Run("new owner enters the field", func(t *testing.T) { + lb, parts := createLoadBalancerForTest(t, "abb", "c") + lbi, err := lb.getAvailablePartitions(context.Background(), parts) + require.NoError(t, err) + require.True(t, lbi.claimMorePartitions) + require.Equal(t, 1, lbi.maxAllowed) + require.Equal(t, ".c.", greedyLoadBalance(lb, lbi, parts)) + }) + + t.Run("deficit, single partition", func(t *testing.T) { + // existing owner, needs to steal a partition + lb, parts := createLoadBalancerForTest(t, "aaaabb", "b") + lbi, err := lb.getAvailablePartitions(context.Background(), parts) + require.NoError(t, err) + require.True(t, lbi.claimMorePartitions) + require.Equal(t, 3, lbi.maxAllowed) + require.Equal(t, "b...bb", greedyLoadBalance(lb, lbi, parts)) + }) + + t.Run("deficit, multiple partitions", func(t *testing.T) { + lb, parts := createLoadBalancerForTest(t, "aaabbbcccd", "d") + lbi, err := lb.getAvailablePartitions(context.Background(), parts) + require.NoError(t, err) + require.True(t, lbi.claimMorePartitions) + require.Equal(t, 2, lbi.maxAllowed) + require.Equal(t, "d........d", greedyLoadBalance(lb, lbi, parts)) + }) + + t.Run("deficit, multiple owners", func(t *testing.T) { + lb, parts := createLoadBalancerForTest(t, "aaabbbccde", "d") + lbi, err := lb.getAvailablePartitions(context.Background(), parts) + require.NoError(t, err) + require.True(t, lbi.claimMorePartitions) + require.Equal(t, 2, lbi.maxAllowed) + require.Equal(t, "d.......d.", greedyLoadBalance(lb, lbi, parts)) + }) + + t.Run("can grab an extra partition", func(t *testing.T) { + // this tests when we have met our minimum but another processor actually owns + // more than it should (even accounting for the extra partition). + lb, parts := createLoadBalancerForTest(t, "aaaabbc", "b") + lbi, err := lb.getAvailablePartitions(context.Background(), parts) + require.NoError(t, err) + require.True(t, lbi.claimMorePartitions) + require.Equal(t, 3, lbi.maxAllowed) + require.Equal(t, "b...bb.", greedyLoadBalance(lb, lbi, parts)) + }) +} + func mapToStrings[T any](src []T, fn func(t T) string) []string { var dest []string @@ -418,3 +501,86 @@ func groupBy[T any](src []T, fn func(t T) string) map[string][]T { return dest } + +// ownershipsAsString converts the list of owners into a string where each character represents a partition +// with the owner or a '.' if nobody owns a partition. +// +// Examples: +// - '..' => two partitions, both unowned. +// - 'a.' => first partition is owned by 'a', second partition is unowned. +// - 'ab' => first partition is owned by 'a', second partition owned by 'b' +func ownershipsAsString(owners []Ownership, total int) string { + bits := make([]rune, total) + + for i := 0; i < len(bits); i++ { + bits[i] = '.' + } + + for _, o := range owners { + idx, err := strconv.ParseInt(o.PartitionID, 10, 32) + + if err != nil { + panic(err) + } + + bits[int(idx)] = ([]rune(o.OwnerID))[0] + } + + return string(bits) +} + +// createLoadBalancerForTest creates a load balancer from initialState, seeding it with a consistent random +// seed so it's safe for unit tests. +// +// initialState - each index in the string represents the owner for that index, which are 1:1 with partition IDs. +// Example: ab" => a owns partition 0, b owns partition 1 +// clientID - the client ID for this load balancer. Changing this lets you test from each owners "perspective". +func createLoadBalancerForTest(t *testing.T, initialState string, clientID string) (*processorLoadBalancer, []string) { + var ownerships []Ownership + var partitions []string + + details := consumerClientDetails{ + FullyQualifiedNamespace: "fakens.servicebus.windows.net", + ConsumerGroup: DefaultConsumerGroup, + EventHubName: "eventhub", + ClientID: clientID, + } + + for partNum, rn := range initialState { + partID := fmt.Sprintf("%d", partNum) + partitions = append(partitions, partID) + + ownerships = append(ownerships, Ownership{ + FullyQualifiedNamespace: details.FullyQualifiedNamespace, + ConsumerGroup: details.ConsumerGroup, + EventHubName: details.EventHubName, + PartitionID: partID, + OwnerID: string(rn), + }) + } + + cps := newCheckpointStoreForTest() + + claimed, err := cps.ClaimOwnership(context.Background(), ownerships, nil) + require.NoError(t, err) + require.Equal(t, len(partitions), len(claimed), "All partitions are owned") + + lb := newProcessorLoadBalancer(cps, details, + ProcessorStrategyBalanced, // we're not claiming for this test so this is ignored. + time.Hour) // no partitions should be considered expired. + + lb.rnd = rand.New(rand.NewSource(1)) + + return lb, partitions +} + +// greedyLoadBalance runs the greedy load balancer algorithm so it's results are deterministic. +func greedyLoadBalance(lb *processorLoadBalancer, lbi loadBalancerInfo, partitionIDs []string) string { + // make the results deterministic by using the same seed for RNG (taken care of in [createLoadBalancerForTest]) + // and ensuring a consistent ordering of this slice since the greedyLoadBalancer generates permutations from it. + sort.Slice(lbi.aboveMax, func(i, j int) bool { + return lbi.aboveMax[i].PartitionID < lbi.aboveMax[j].PartitionID + }) + + return ownershipsAsString(lb.greedyLoadBalancer(context.Background(), lbi), len(partitionIDs)) +} diff --git a/sdk/messaging/azeventhubs/processor_unit_test.go b/sdk/messaging/azeventhubs/processor_unit_test.go index 950ec342d18e..8f47799a0fcc 100644 --- a/sdk/messaging/azeventhubs/processor_unit_test.go +++ b/sdk/messaging/azeventhubs/processor_unit_test.go @@ -38,7 +38,7 @@ func TestUnit_Processor_loadBalancing(t *testing.T) { // which means that we get to claim them all require.Empty(t, lbinfo.aboveMax) require.Empty(t, lbinfo.current) - require.False(t, lbinfo.extraPartitionPossible) + require.True(t, lbinfo.claimMorePartitions) require.Equal(t, 3, lbinfo.maxAllowed, "only 1 possible owner (us), so we're allowed all the available partitions") expectedOwnerships := []Ownership{ @@ -89,8 +89,8 @@ func TestUnit_Processor_loadBalancing(t *testing.T) { require.NoError(t, err) require.Empty(t, lbinfo.aboveMax) require.Empty(t, lbinfo.current) - require.True(t, lbinfo.extraPartitionPossible, "divvying 3 partitions amongst 2 processors") - require.Equal(t, 2, lbinfo.maxAllowed, "now we're divvying up 3 partitions between 2 processors. At _most_ you can have min+1") + require.True(t, lbinfo.claimMorePartitions) + require.Equal(t, 1, lbinfo.maxAllowed, "the max is now 1 (instead of 2) because _our_ processor doesn't own enough") // there are two available partition ownerships - we should be getting one of them. newProcessorOwnerships, err := secondProcessor.lb.LoadBalance(context.Background(), allPartitionIDs)