Skip to content

Commit

Permalink
Adding in some TODO comments to reevaluate this load balancer code in…
Browse files Browse the repository at this point in the history
… the next year.
  • Loading branch information
Richard Park committed Dec 16, 2023
1 parent 770d2ae commit 836b7da
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 41 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package tests

import (
Expand Down
98 changes: 57 additions & 41 deletions sdk/messaging/azeventhubs/processor_load_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,10 @@ type loadBalancerInfo struct {
// it contains _all_ the partitions for that particular consumer.
aboveMax []Ownership

// maxAllowed is the maximum number of partitions a consumer should have
// If partitions do not divide evenly this will be the "theoretical" max
// with the assumption that this particular consumer will get an extra
// partition.
maxAllowed int

// extraPartitionPossible is true if the partitions cannot split up evenly
// amongst all the known consumers.
extraPartitionPossible bool
// claimMorePartitions is true when we should try to claim more partitions
// because we're under the limit, or we're in a situation where we could claim
// one extra partition.
claimMorePartitions bool

raw []Ownership
}
Expand All @@ -68,33 +63,9 @@ func (lb *processorLoadBalancer) LoadBalance(ctx context.Context, partitionIDs [
return nil, err
}

claimMorePartitions := true

if len(lbinfo.current) >= lbinfo.maxAllowed {
// - I have _exactly_ the right amount
// or
// - I have too many. We expect to have some stolen from us, but we'll maintain
// ownership for now.
claimMorePartitions = false
log.Writef(EventConsumer, "[%s] Owns %d/%d, no more needed", lb.details.ClientID, len(lbinfo.current), lbinfo.maxAllowed)
} else if lbinfo.extraPartitionPossible && len(lbinfo.current) == lbinfo.maxAllowed-1 {
// In the 'extraPartitionPossible' scenario, some consumers will have an extra partition
// since things don't divide up evenly. We're one under the max, which means we _might_
// be able to claim another one.
//
// We will attempt to grab _one_ more but only if there are free partitions available
// or if one of the consumers has more than the max allowed.
claimMorePartitions = len(lbinfo.unownedOrExpired) > 0 || len(lbinfo.aboveMax) > 0
log.Writef(EventConsumer, "[%s] Unowned/expired: %d, above max: %d, need to claim: %t",
lb.details.ClientID,
len(lbinfo.unownedOrExpired),
len(lbinfo.aboveMax),
claimMorePartitions)
}

ownerships := lbinfo.current

if claimMorePartitions {
if lbinfo.claimMorePartitions {
switch lb.strategy {
case ProcessorStrategyGreedy:
log.Writef(EventConsumer, "[%s] Using greedy strategy to claim partitions", lb.details.ClientID)
Expand All @@ -118,7 +89,7 @@ func (lb *processorLoadBalancer) LoadBalance(ctx context.Context, partitionIDs [
}

if log.Should(EventConsumer) {
log.Writef(EventConsumer, "[%s] Asked for %s, got %s", lb.details.ClientID, partitionsForOwnerships(ownerships), partitionsForOwnerships(actual))
log.Writef(EventConsumer, "[%0.5s] Asked for %s, got %s", lb.details.ClientID, partitionsForOwnerships(ownerships), partitionsForOwnerships(actual))
}

return actual, nil
Expand Down Expand Up @@ -199,6 +170,8 @@ func (lb *processorLoadBalancer) getAvailablePartitions(ctx context.Context, par
// only allow owners to keep extra partitions if we've already met our minimum bar. Otherwise
// above the minimum is fair game.
if allowExtraPartition && len(groupedByOwner[lb.details.ClientID]) >= minRequired {
// raise the waterline - we've got all we need so we don't need to steal from processors
// that only have one extra partition (ie, avoids thrash)
maxAllowed += 1
}

Expand All @@ -214,13 +187,56 @@ func (lb *processorLoadBalancer) getAvailablePartitions(ctx context.Context, par
}
}

asdkjlasdfas

// TODO: getAvailablePartitions() was exporting too much state.
//
// TODO: I've moved this logic into getAvailablePartitions, where it makes more sense
// TODO: in context, and remove `lbinfo.maxAllowed` and `lbinfo.extraPartitionPossible` from the load balancer
// TODO: info that's returned.
//

asdkjlasdfas

claimMorePartitions := true
current := groupedByOwner[lb.details.ClientID]

asdfasdfasfasdf

// TODO: this code needs to be checked for two variants - I am allowing an extra partition or
// TODO: I'm not (based on the logic above that restricts it).

asdasfasdfasdf // TODO: 'instead of maxAllowed we could just use minRequired?'
if len(current) >= maxAllowed {
// - I have _exactly_ the right amount
// or
// - I have too many. We expect to have some stolen from us, but we'll maintain
// ownership for now.
claimMorePartitions = false
log.Writef(EventConsumer, "[%s] Owns %d/%d, no more needed", lb.details.ClientID, len(current), maxAllowed)

asdasfasdfasdf // TODO: 'instead of maxAllowed-1 we could just use minRequired?'
} else if allowExtraPartition && len(current) == maxAllowed-1 {
// In the 'extraPartitionPossible' scenario, some consumers will have an extra partition
// since things don't divide up evenly. We're one under the max, which means we _might_
// be able to claim another one.
//
// We will attempt to grab _one_ more but only if there are free partitions available
// or if one of the consumers has more than the max allowed.
claimMorePartitions = len(unownedOrExpired) > 0 || len(aboveMax) > 0
log.Writef(EventConsumer, "[%s] Unowned/expired: %d, above max: %d, need to claim: %t",
lb.details.ClientID,
len(unownedOrExpired),
len(aboveMax),
claimMorePartitions)
}

return loadBalancerInfo{
current: groupedByOwner[lb.details.ClientID],
unownedOrExpired: unownedOrExpired,
aboveMax: aboveMax,
maxAllowed: maxAllowed,
extraPartitionPossible: allowExtraPartition,
raw: ownerships,
current: current,
unownedOrExpired: unownedOrExpired,
aboveMax: aboveMax,
claimMorePartitions: claimMorePartitions,
raw: ownerships,
}, nil
}

Expand Down
3 changes: 3 additions & 0 deletions sdk/messaging/azeventhubs/processor_load_balancers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,9 @@ func TestUnit_ProcessorLoadBalancer_allPartitionsAlreadyOwned(t *testing.T) {
require.NoError(t, err)
require.Empty(t, lbInfo.unownedOrExpired, "no partitions were expired in this load balancing round")
require.Equal(t, td.AboveMax, ownersAsString(t, lbInfo.aboveMax, len(partitions)))

asdfasdfasfasdf
// TODO: adjust this test to check for 'lbinfo.claimMorePartitions'
})
}
}
Expand Down

0 comments on commit 836b7da

Please sign in to comment.