From f99199ee9630dc2a31ca03e9b2f51b292c3226d4 Mon Sep 17 00:00:00 2001 From: Richard Park <51494936+richardpark-msft@users.noreply.github.com> Date: Fri, 5 Jan 2024 03:02:17 +0000 Subject: [PATCH] WIP --- .../inmemory_checkpoint_store_test.go | 5 + .../eh/stress/tests/balance_tester.go | 52 ++-- .../eh/stress/tests/batch_stress_tester.go | 2 +- .../eh/stress/tests/multi_balance_tester.go | 53 ++-- .../stress/tests/processor_stress_tester.go | 2 +- .../internal/eh/stress/tests/shared.go | 26 +- .../azeventhubs/processor_load_balancer.go | 55 ++-- .../processor_load_balancers_test.go | 252 ++++++++++-------- 8 files changed, 249 insertions(+), 198 deletions(-) diff --git a/sdk/messaging/azeventhubs/inmemory_checkpoint_store_test.go b/sdk/messaging/azeventhubs/inmemory_checkpoint_store_test.go index e0b0f50fa65c..55c3650d4869 100644 --- a/sdk/messaging/azeventhubs/inmemory_checkpoint_store_test.go +++ b/sdk/messaging/azeventhubs/inmemory_checkpoint_store_test.go @@ -4,6 +4,7 @@ package azeventhubs import ( "context" + "sort" "strings" "sync" "testing" @@ -277,6 +278,10 @@ func (cps *testCheckpointStore) ListOwnership(ctx context.Context, fullyQualifie ownerships = append(ownerships, v) } + sort.Slice(ownerships, func(i, j int) bool { + return ownerships[i].PartitionID < ownerships[j].PartitionID + }) + return ownerships, nil } diff --git a/sdk/messaging/azeventhubs/internal/eh/stress/tests/balance_tester.go b/sdk/messaging/azeventhubs/internal/eh/stress/tests/balance_tester.go index 0ec9e7a4846a..c5556ee4c986 100644 --- a/sdk/messaging/azeventhubs/internal/eh/stress/tests/balance_tester.go +++ b/sdk/messaging/azeventhubs/internal/eh/stress/tests/balance_tester.go @@ -9,7 +9,9 @@ import ( "errors" "flag" "fmt" + golog "log" "os" + "strings" "sync" "time" @@ -31,13 +33,20 @@ func BalanceTester(ctx context.Context) error { numProcessors := fs.Int("processors", 32, "The # of processor instances to run") strategy := fs.String("strategy", string(azeventhubs.ProcessorStrategyBalanced), "The partition acquisition strategy to use (balanced, greedy)") - enableVerboseLoggingFn := addVerboseLoggingFlag(fs) if err := fs.Parse(os.Args[2:]); err != nil { return err } - enableVerboseLoggingFn() + log.SetEvents(EventBalanceTest, azeventhubs.EventConsumer) + log.SetListener(func(e log.Event, s string) { + if e == azeventhubs.EventConsumer && + !strings.Contains(s, "Asked for") { + return + } + + golog.Printf("[%s] %s", e, s) + }) return balanceTesterImpl(ctx, *numProcessors, azeventhubs.ProcessorStrategy(*strategy)) } @@ -99,7 +108,7 @@ func (bt *balanceTester) Run(ctx context.Context) error { wg := sync.WaitGroup{} failuresChan := make(chan error, bt.numProcessors) - testCtx, cancelTest := context.WithTimeout(context.Background(), 5*time.Minute) + testCtx, cancelTest := context.WithTimeout(context.Background(), 10*time.Minute) defer cancelTest() mu := sync.Mutex{} @@ -111,6 +120,8 @@ func (bt *balanceTester) Run(ctx context.Context) error { var firstBalance time.Duration Loop: + // poll every 5 seconds to see if the checkpoint store is "balanced" (all owners + // own a fair-share of the partitions). for { select { case <-ctx.Done(): @@ -204,6 +215,9 @@ func (bt *balanceTester) Run(ctx context.Context) error { } func (bt *balanceTester) process(ctx context.Context, name string) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + client, err := azeventhubs.NewConsumerClientFromConnectionString(bt.ConnectionString, bt.HubName, azeventhubs.DefaultConsumerGroup, nil) if err != nil { @@ -240,7 +254,9 @@ func (bt *balanceTester) process(ctx context.Context, name string) error { return err } + ch := make(chan struct{}) go func() { + defer close(ch) for { pc := processor.NextPartitionClient(ctx) @@ -252,30 +268,30 @@ func (bt *balanceTester) process(ctx context.Context, name string) error { } }() - return processor.Run(ctx) + err = processor.Run(ctx) + cancel() + <-ch + + return err } func (bt *balanceTester) keepAlive(ctx context.Context, pc *azeventhubs.ProcessorPartitionClient) { -ReceiveLoop: + defer func() { + _ = pc.Close(context.Background()) + }() + for { - _, err := pc.ReceiveEvents(ctx, 1, nil) - - var eventHubErr *azeventhubs.Error - - switch { - case errors.Is(err, context.Canceled): - // test is over, we'll see how everything shook out. - break ReceiveLoop - case errors.As(err, &eventHubErr) && eventHubErr.Code == azeventhubs.ErrorCodeOwnershipLost: - // we've swapped ownership with _someone_. Record it in the map, but be careful of potential ordering - // and only delete if we're the one in the map! - break ReceiveLoop + if _, err := pc.ReceiveEvents(ctx, 1, nil); err != nil { + break } } } type unbalancedError error +// checkBalance queries the checkpoint store. +// It returns `nil` if no error occurred and the checkpoint store was balanced. +// If the checkpoint store is NOT balanced it returns an unbalancedError func (bt *balanceTester) checkBalance(ctx context.Context) error { blobClient, err := azblob.NewClientFromConnectionString(bt.StorageConnectionString, nil) @@ -373,7 +389,7 @@ type stats struct { } func (s *stats) String() string { - jsonBytes, err := json.MarshalIndent(s, " ", " ") + jsonBytes, err := json.Marshal(s) if err != nil { panic(err) diff --git a/sdk/messaging/azeventhubs/internal/eh/stress/tests/batch_stress_tester.go b/sdk/messaging/azeventhubs/internal/eh/stress/tests/batch_stress_tester.go index d58e601caac6..8b974a05bb9d 100644 --- a/sdk/messaging/azeventhubs/internal/eh/stress/tests/batch_stress_tester.go +++ b/sdk/messaging/azeventhubs/internal/eh/stress/tests/batch_stress_tester.go @@ -47,7 +47,7 @@ func getBatchTesterParams(args []string) (batchTesterParams, error) { fs.IntVar(¶ms.paddingBytes, "padding", 1024, "Extra number of bytes to add into each message body") fs.StringVar(¶ms.partitionID, "partition", "0", "Partition ID to send and receive events to") fs.IntVar(¶ms.maxDeadlineExceeded, "maxtimeouts", 10, "Number of consecutive receive timeouts allowed before quitting") - enableVerboseLoggingFn := addVerboseLoggingFlag(fs) + enableVerboseLoggingFn := addVerboseLoggingFlag(fs, nil) sleepAfterFn := addSleepAfterFlag(fs) diff --git a/sdk/messaging/azeventhubs/internal/eh/stress/tests/multi_balance_tester.go b/sdk/messaging/azeventhubs/internal/eh/stress/tests/multi_balance_tester.go index 1165b3006504..c3f0507ecd11 100644 --- a/sdk/messaging/azeventhubs/internal/eh/stress/tests/multi_balance_tester.go +++ b/sdk/messaging/azeventhubs/internal/eh/stress/tests/multi_balance_tester.go @@ -6,7 +6,10 @@ package tests import ( "context" "flag" + "fmt" + golog "log" "os" + "strings" "github.com/Azure/azure-sdk-for-go/sdk/internal/log" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" @@ -15,42 +18,62 @@ import ( func MultiBalanceTester(ctx context.Context) error { fs := flag.NewFlagSet("", flag.ContinueOnError) - enableVerboseLoggingFn := addVerboseLoggingFlag(fs) rounds := fs.Int("rounds", 1, "Number of rounds to run") if err := fs.Parse(os.Args[2:]); err != nil { return err } - enableVerboseLoggingFn() + ch := make(chan string, 10000) + + log.SetEvents(EventBalanceTest, azeventhubs.EventConsumer) + log.SetListener(func(e log.Event, s string) { + if e == azeventhubs.EventConsumer && + !strings.Contains(s, "Asked for") { + return + } + + ch <- fmt.Sprintf("[%s] %s", e, s) + }) + + go func() { + for { + select { + case s := <-ch: + golog.Println(s) + case <-ctx.Done(): + break + } + } + }() for i := 0; i < *rounds; i++ { testData := []struct { Processors int Strategy azeventhubs.ProcessorStrategy }{ - // {32, azeventhubs.ProcessorStrategyGreedy}, - // {31, azeventhubs.ProcessorStrategyGreedy}, - // {16, azeventhubs.ProcessorStrategyGreedy}, - // {5, azeventhubs.ProcessorStrategyGreedy}, - // {1, azeventhubs.ProcessorStrategyGreedy}, - - // {32, azeventhubs.ProcessorStrategyBalanced}, - // {31, azeventhubs.ProcessorStrategyBalanced}, - // {16, azeventhubs.ProcessorStrategyBalanced}, + {32, azeventhubs.ProcessorStrategyGreedy}, + {31, azeventhubs.ProcessorStrategyGreedy}, + {16, azeventhubs.ProcessorStrategyGreedy}, + {5, azeventhubs.ProcessorStrategyGreedy}, + {1, azeventhubs.ProcessorStrategyGreedy}, + + {32, azeventhubs.ProcessorStrategyBalanced}, + {31, azeventhubs.ProcessorStrategyBalanced}, + {16, azeventhubs.ProcessorStrategyBalanced}, {5, azeventhubs.ProcessorStrategyBalanced}, - // {1, azeventhubs.ProcessorStrategyBalanced}, + {1, azeventhubs.ProcessorStrategyBalanced}, } for _, td := range testData { - log.Writef(EventBalanceTest, "----- BEGIN[%d]: %s, %d processors -----", *rounds, td.Strategy, td.Processors) + log.Writef(EventBalanceTest, "----- BEGIN[%d]: %s, %d processors -----", i, td.Strategy, td.Processors) if err := balanceTesterImpl(ctx, td.Processors, td.Strategy); err != nil { - log.Writef(EventBalanceTest, "----- END[%d]: FAIL: %s, %d processors, %s -----", *rounds, td.Strategy, td.Processors, err) + log.Writef(EventBalanceTest, "----- END[%d]: FAIL: %s, %d processors, %s -----", i, td.Strategy, td.Processors, err) return err } - log.Writef(EventBalanceTest, "----- END[%d]: %s, %d processors -----", *rounds, td.Strategy, td.Processors) + log.Writef(EventBalanceTest, "----- END[%d]: %s, %d processors -----", i, td.Strategy, td.Processors) } } diff --git a/sdk/messaging/azeventhubs/internal/eh/stress/tests/processor_stress_tester.go b/sdk/messaging/azeventhubs/internal/eh/stress/tests/processor_stress_tester.go index b366a3002803..13485da36fb7 100644 --- a/sdk/messaging/azeventhubs/internal/eh/stress/tests/processor_stress_tester.go +++ b/sdk/messaging/azeventhubs/internal/eh/stress/tests/processor_stress_tester.go @@ -53,7 +53,7 @@ func newProcessorStressTest(args []string) (*processorStressTest, error) { eventsPerRound := fs.Int("send", 5000, "Number of events to send per round") rounds := fs.Int64("rounds", 100, "Number of rounds. -1 means math.MaxInt64") prefetch := fs.Int("prefetch", 0, "Number of events to set for the prefetch. Negative numbers disable prefetch altogether. 0 uses the default for the package.") - enableVerboseLoggingFn := addVerboseLoggingFlag(fs) + enableVerboseLoggingFn := addVerboseLoggingFlag(fs, nil) sleepAfterFn := addSleepAfterFlag(fs) if err := fs.Parse(args); err != nil { diff --git a/sdk/messaging/azeventhubs/internal/eh/stress/tests/shared.go b/sdk/messaging/azeventhubs/internal/eh/stress/tests/shared.go index 4cf69795fa7b..8b6a1b17d2f4 100644 --- a/sdk/messaging/azeventhubs/internal/eh/stress/tests/shared.go +++ b/sdk/messaging/azeventhubs/internal/eh/stress/tests/shared.go @@ -397,28 +397,30 @@ func addSleepAfterFlag(fs *flag.FlagSet) func() { } } -func addVerboseLoggingFlag(fs *flag.FlagSet) func() { +func addVerboseLoggingFlag(fs *flag.FlagSet, customLogFn func(verbose string, e azlog.Event, s string)) func() { verbose := fs.String("v", "", "Enable verbose SDK logging. Valid values are test or sdk or all") + logFn := func(e azlog.Event, s string) { + log.Printf("[%s] %s", e, s) + } + + if customLogFn != nil { + logFn = func(e azlog.Event, s string) { + customLogFn(*verbose, e, s) + } + } + return func() { switch *verbose { case "": case "test": azlog.SetEvents(EventBalanceTest) - - azlog.SetListener(func(e azlog.Event, s string) { - log.Printf("[%s] %s", e, s) - }) + azlog.SetListener(logFn) case "sdk": azlog.SetEvents(EventBalanceTest, azeventhubs.EventConsumer, azeventhubs.EventProducer) - - azlog.SetListener(func(e azlog.Event, s string) { - log.Printf("[%s] %s", e, s) - }) + azlog.SetListener(logFn) case "all": - azlog.SetListener(func(e azlog.Event, s string) { - log.Printf("[%s] %s", e, s) - }) + azlog.SetListener(logFn) default: fmt.Printf("%s is not a valid logging value. Valid values are test or sdk or all", *verbose) } diff --git a/sdk/messaging/azeventhubs/processor_load_balancer.go b/sdk/messaging/azeventhubs/processor_load_balancer.go index aa4e620e796e..1483f6af5dab 100644 --- a/sdk/messaging/azeventhubs/processor_load_balancer.go +++ b/sdk/messaging/azeventhubs/processor_load_balancer.go @@ -51,14 +51,8 @@ type loadBalancerInfo struct { // one extra partition. claimMorePartitions bool - // maxAllowed is the maximum allowed, including a potential extra partition if - // that's possible with the num-partitions/num-owners. maxAllowed int - // minRequired is the minimum required. If partitions are evenly divisible amongst - // all the owners this will be the same as maxAllowed. - minRequired int - raw []Ownership } @@ -175,30 +169,21 @@ func (lb *processorLoadBalancer) getAvailablePartitions(ctx context.Context, par maxAllowed := minRequired allowExtraPartition := len(partitionIDs)%len(groupedByOwner) > 0 - if allowExtraPartition { + // only allow owners to keep extra partitions if we've already met our minimum bar. Otherwise + // above the minimum is fair game. + if allowExtraPartition && len(groupedByOwner[lb.details.ClientID]) >= minRequired { maxAllowed += 1 } var aboveMax []Ownership - { - threshold := minRequired - - // only allow other owners to keep extra partitions if we have already met our minimum - // bar - without this we could end up starving ourselves and never grabbing more partitions. - if allowExtraPartition && len(groupedByOwner[lb.details.ClientID]) >= minRequired { - // raise the waterline - we've got all we need so we don't need to steal from processors - // that only have one extra partition (ie, avoids thrash) - threshold = maxAllowed - } - for id, ownerships := range groupedByOwner { - if id == lb.details.ClientID { - continue - } + for id, ownerships := range groupedByOwner { + if id == lb.details.ClientID { + continue + } - if len(ownerships) > threshold { - aboveMax = append(aboveMax, ownerships...) - } + if len(ownerships) > maxAllowed { + aboveMax = append(aboveMax, ownerships...) } } @@ -211,17 +196,16 @@ func (lb *processorLoadBalancer) getAvailablePartitions(ctx context.Context, par // - I have too many. We expect to have some stolen from us, but we'll maintain // ownership for now. claimMorePartitions = false - log.Writef(EventConsumer, "[%s] Owns %d/%d, no more needed", lb.details.ClientID, len(current), maxAllowed) - } else if allowExtraPartition && len(current) == minRequired { + + } else if allowExtraPartition && len(current) == maxAllowed-1 { // In the 'allowExtraPartition' scenario, some consumers will have an extra partition - // since things don't divide up evenly. We only have the minimum, which means we _might_ + // since things don't divide up evenly. We're one under the max, which means we _might_ // be able to claim another one. // // We will attempt to grab _one_ more but only if there are free partitions available // or if one of the consumers has more than the max allowed. claimMorePartitions = len(unownedOrExpired) > 0 || len(aboveMax) > 0 - log.Writef(EventConsumer, "[%s] Unowned/expired: %d, above max: %d, need to claim: %t", lb.details.ClientID, len(unownedOrExpired), @@ -230,13 +214,12 @@ func (lb *processorLoadBalancer) getAvailablePartitions(ctx context.Context, par } return loadBalancerInfo{ + current: current, + unownedOrExpired: unownedOrExpired, aboveMax: aboveMax, claimMorePartitions: claimMorePartitions, - current: current, - maxAllowed: maxAllowed, - minRequired: minRequired, raw: ownerships, - unownedOrExpired: unownedOrExpired, + maxAllowed: maxAllowed, }, nil } @@ -249,14 +232,12 @@ func (lb *processorLoadBalancer) greedyLoadBalancer(ctx context.Context, lbinfo randomOwnerships := getRandomOwnerships(lb.rnd, lbinfo.unownedOrExpired, lbinfo.maxAllowed-len(ours)) ours = append(ours, randomOwnerships...) - // since we're stealing we want to only grab up to our minimum, not the theoretical max with an extra - // partition. - if len(ours) < lbinfo.minRequired { + if len(ours) < lbinfo.maxAllowed { log.Writef(EventConsumer, "Not enough expired or unowned partitions, will need to steal from other processors") // if that's not enough then we'll randomly steal from any owners that had partitions // above the maximum. - randomOwnerships := getRandomOwnerships(lb.rnd, lbinfo.aboveMax, lbinfo.minRequired-len(ours)) + randomOwnerships := getRandomOwnerships(lb.rnd, lbinfo.aboveMax, lbinfo.maxAllowed-len(ours)) ours = append(ours, randomOwnerships...) } @@ -267,7 +248,7 @@ func (lb *processorLoadBalancer) greedyLoadBalancer(ctx context.Context, lbinfo return ours } -// balancedLoadBalancer attempts to split the partition load out between the available +// balancedLoadBalancer attempts to split the partition load out between the available 0 // consumers so each one has an even amount (or even + 1, if the # of consumers and # // of partitions doesn't divide evenly). // diff --git a/sdk/messaging/azeventhubs/processor_load_balancers_test.go b/sdk/messaging/azeventhubs/processor_load_balancers_test.go index b33f08aa3bb1..3d08d661b36f 100644 --- a/sdk/messaging/azeventhubs/processor_load_balancers_test.go +++ b/sdk/messaging/azeventhubs/processor_load_balancers_test.go @@ -5,6 +5,7 @@ package azeventhubs import ( "context" "fmt" + "math/rand" "sort" "strconv" "testing" @@ -330,126 +331,73 @@ func TestProcessorLoadBalancers_AnyStrategy_GrabRelinquishedPartition(t *testing } } -func TestUnit_ProcessorLoadBalancer_allPartitionsAlreadyOwned(t *testing.T) { - setup := func(t *testing.T, owners string) (*processorLoadBalancer, []string) { - var ownerships []Ownership - var partitions []string - - details := consumerClientDetails{ - FullyQualifiedNamespace: "fakens.servicebus.windows.net", - ConsumerGroup: DefaultConsumerGroup, - EventHubName: "eventhub", - ClientID: "%", - } - - for partNum, rn := range owners { - partID := fmt.Sprintf("%d", partNum) - partitions = append(partitions, partID) - - ownerships = append(ownerships, Ownership{ - FullyQualifiedNamespace: details.FullyQualifiedNamespace, - ConsumerGroup: details.ConsumerGroup, - EventHubName: details.EventHubName, - PartitionID: partID, - OwnerID: string(rn), +func TestUnit_ProcessorLoadBalancer_Balanced(t *testing.T) { + for _, td := range []string{"abc", "abbc", "abbcc"} { + for _, ownerID := range []string{"a", "b", "c"} { + t.Run(fmt.Sprintf("Layout %q, with owner %q", td, ownerID), func(t *testing.T) { + lb, parts := createLoadBalancerForTest(t, td, ownerID) + lbi, err := lb.getAvailablePartitions(context.Background(), parts) + require.NoError(t, err) + require.False(t, lbi.claimMorePartitions) }) } - - cps := newCheckpointStoreForTest() - - claimed, err := cps.ClaimOwnership(context.Background(), ownerships, nil) - require.NoError(t, err) - require.Equal(t, len(partitions), len(claimed), "All partitions are owned") - - return newProcessorLoadBalancer(cps, details, - ProcessorStrategyBalanced, // we're not claiming for this test so this is ignored. - time.Hour), partitions // no partitions should be considered expired. - } - - ownersAsString := func(t *testing.T, owners []Ownership, total int) string { - bits := make([]rune, total) - - for i := 0; i < len(bits); i++ { - bits[i] = '.' - } - - for _, o := range owners { - idx, err := strconv.ParseInt(o.PartitionID, 10, 32) - require.NoError(t, err) - bits[int(idx)] = ([]rune(o.OwnerID))[0] - } - - return string(bits) } - testData := []struct { - StartState string // each letter represents a partition index. '%' is _our_ client ID. - AboveMax string // '.' means that partition index isn't available - ShouldClaim bool // whether we're supposed to claim more partitions or not. - MaxAllowed int // the max number of partitions any processor is allowed to own - MinRequired int // the minimum amount that every processor should have. - Title string - }{ - // basic stuff (some of these cases overlap) - {"aabbb", "aabbb", true, 2, 1, "two owners have extra partitions"}, - {"aaabbb", "aaabbb", true, 2, 2, "even split even when new owner added"}, - {"aabbcc", "aabbcc", true, 2, 1, "three owners have an extra partition"}, - - // extra partition states - {"abb", ".bb", true, 1, 1, "one owner has an extra partition"}, - - // cases where we already own some partitions - {"ab%", "...", false, 1, 1, "we already own the minimum (no extra partitions)"}, - {"abb%", "....", false, 2, 1, "we already own the minimum (with extra partitions)"}, - {"abb%%", ".....", false, 2, 1, "we already have an extra partition"}, - } - - for _, td := range testData { - t.Run(td.Title, func(t *testing.T) { - require.Equal(t, len(td.StartState), len(td.AboveMax)) - - lb, partitions := setup(t, td.StartState) - - lbInfo, err := lb.getAvailablePartitions(context.Background(), partitions) - require.NoError(t, err) - require.Empty(t, lbInfo.unownedOrExpired, "no partitions were expired in this load balancing round") - require.Equal(t, td.AboveMax, ownersAsString(t, lbInfo.aboveMax, len(partitions))) - require.Equal(t, td.ShouldClaim, lbInfo.claimMorePartitions) - require.Equal(t, td.MaxAllowed, lbInfo.maxAllowed) - require.Equal(t, td.MinRequired, lbInfo.minRequired) - - ownerships := lb.greedyLoadBalancer(context.Background(), lbInfo) - require.NotEmpty(t, ownerships) - - numNewOwnerships := ownershipDifference(lbInfo.current, ownerships) - - if td.ShouldClaim { - // we expect that we've got new ownerships - require.Equal(t, lbInfo.minRequired-len(lbInfo.current), numNewOwnerships) - } else { - // we expect to not get new ownerships - require.Equal(t, 0, numNewOwnerships) - } - }) - } + lb, parts := createLoadBalancerForTest(t, "aaaabbb", "b") + lbi, err := lb.getAvailablePartitions(context.Background(), parts) + require.NoError(t, err) + require.False(t, lbi.claimMorePartitions) + require.Equal(t, 4, lbi.maxAllowed) } -func ownershipDifference(origOwnerships []Ownership, newOwnerships []Ownership) int { - m := map[string]bool{} - - for _, o := range origOwnerships { - m[o.PartitionID] = true - } - - unique := 0 - - for _, o := range newOwnerships { - if !m[o.PartitionID] { - unique++ - } - } - - return unique +func TestUnit_ProcessorLoadBalancer_Unbalanced(t *testing.T) { + t.Run("new owner enters the field", func(t *testing.T) { + lb, parts := createLoadBalancerForTest(t, "abb", "c") + lbi, err := lb.getAvailablePartitions(context.Background(), parts) + require.NoError(t, err) + require.True(t, lbi.claimMorePartitions) + require.Equal(t, 1, lbi.maxAllowed) + require.Equal(t, ".c.", greedyLoadBalance(lb, lbi, parts)) + }) + + t.Run("single partition deficit", func(t *testing.T) { + // existing owner, needs to steal a partition + lb, parts := createLoadBalancerForTest(t, "aaaabb", "b") + lbi, err := lb.getAvailablePartitions(context.Background(), parts) + require.NoError(t, err) + require.True(t, lbi.claimMorePartitions) + require.Equal(t, 3, lbi.maxAllowed) + require.Equal(t, "b...bb", greedyLoadBalance(lb, lbi, parts)) + }) + + t.Run("deficit by more than a single partition", func(t *testing.T) { + lb, parts := createLoadBalancerForTest(t, "aaabbbcccd", "d") + lbi, err := lb.getAvailablePartitions(context.Background(), parts) + require.NoError(t, err) + require.True(t, lbi.claimMorePartitions) + require.Equal(t, 2, lbi.maxAllowed) + require.Equal(t, "d........d", greedyLoadBalance(lb, lbi, parts)) + }) + + t.Run("not the only owner with a partition deficit", func(t *testing.T) { + lb, parts := createLoadBalancerForTest(t, "aaabbbccde", "d") + lbi, err := lb.getAvailablePartitions(context.Background(), parts) + require.NoError(t, err) + require.True(t, lbi.claimMorePartitions) + require.Equal(t, 2, lbi.maxAllowed) + require.Equal(t, "d.......d.", greedyLoadBalance(lb, lbi, parts)) + }) + + t.Run("can grab an extra partition", func(t *testing.T) { + // this tests when we have met our minimum but another processor actually owns + // more than it should (even accounting for the extra partition). + lb, parts := createLoadBalancerForTest(t, "aaaabbc", "b") + lbi, err := lb.getAvailablePartitions(context.Background(), parts) + require.NoError(t, err) + require.True(t, lbi.claimMorePartitions) + require.Equal(t, 3, lbi.maxAllowed) + require.Equal(t, "b...bb.", greedyLoadBalance(lb, lbi, parts)) + }) } func mapToStrings[T any](src []T, fn func(t T) string) []string { @@ -542,3 +490,79 @@ func groupBy[T any](src []T, fn func(t T) string) map[string][]T { return dest } + +// ownershipsAsString converts the list of owners into a string where each character represents a partition +// with the owner or a '.' if nobody owns a partition. +// +// Examples: +// - '..' => two partitions, both unowned. +// - 'a.' => first partition is owned by 'a', second partition is unowned. +// - 'ab' => first partition is owned by 'a', second partition owned by 'b' +func ownershipsAsString(owners []Ownership, total int) string { + bits := make([]rune, total) + + for i := 0; i < len(bits); i++ { + bits[i] = '.' + } + + for _, o := range owners { + idx, err := strconv.ParseInt(o.PartitionID, 10, 32) + + if err != nil { + panic(err) + } + + bits[int(idx)] = ([]rune(o.OwnerID))[0] + } + + return string(bits) +} + +func createLoadBalancerForTest(t *testing.T, initialState string, clientID string) (*processorLoadBalancer, []string) { + var ownerships []Ownership + var partitions []string + + details := consumerClientDetails{ + FullyQualifiedNamespace: "fakens.servicebus.windows.net", + ConsumerGroup: DefaultConsumerGroup, + EventHubName: "eventhub", + ClientID: clientID, + } + + for partNum, rn := range initialState { + partID := fmt.Sprintf("%d", partNum) + partitions = append(partitions, partID) + + ownerships = append(ownerships, Ownership{ + FullyQualifiedNamespace: details.FullyQualifiedNamespace, + ConsumerGroup: details.ConsumerGroup, + EventHubName: details.EventHubName, + PartitionID: partID, + OwnerID: string(rn), + }) + } + + cps := newCheckpointStoreForTest() + + claimed, err := cps.ClaimOwnership(context.Background(), ownerships, nil) + require.NoError(t, err) + require.Equal(t, len(partitions), len(claimed), "All partitions are owned") + + lb := newProcessorLoadBalancer(cps, details, + ProcessorStrategyBalanced, // we're not claiming for this test so this is ignored. + time.Hour) // no partitions should be considered expired. + + lb.rnd = rand.New(rand.NewSource(1)) + + return lb, partitions +} + +func greedyLoadBalance(lb *processorLoadBalancer, lbi loadBalancerInfo, partitionIDs []string) string { + // make the results deterministic by using the same seed for RNG (taken care of in [createLoadBalancerForTest]) + // and ensuring a consistent ordering of this slice since the greedyLoadBalancer generates permutations from it. + sort.Slice(lbi.aboveMax, func(i, j int) bool { + return lbi.aboveMax[i].PartitionID < lbi.aboveMax[j].PartitionID + }) + + return ownershipsAsString(lb.greedyLoadBalancer(context.Background(), lbi), len(partitionIDs)) +}