Skip to content

Commit

Permalink
[azeventhubs] Fixing an issue where a processor wouldn't grab partiti…
Browse files Browse the repository at this point in the history
…ons (#22153)

The processor load balancer had a bug where a processor would not claim partitions even if it didn't yet have the proper share.

Added in stress tests (balance, multibalance) to run several scenarios with our supported strategies and different parallel owners.

Fixes #22097
  • Loading branch information
richardpark-msft authored Jan 8, 2024
1 parent 06a9327 commit e63fa24
Show file tree
Hide file tree
Showing 14 changed files with 801 additions and 96 deletions.
8 changes: 2 additions & 6 deletions sdk/messaging/azeventhubs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
# Release History

## 1.0.3 (Unreleased)

### Features Added

### Breaking Changes
## 1.0.3 (2024-01-16)

### Bugs Fixed

### Other Changes
- Processor distributes partitions optimally, which would result in idle or over-assigned processors. (PR#22153)

## 1.0.2 (2023-11-07)

Expand Down
5 changes: 5 additions & 0 deletions sdk/messaging/azeventhubs/inmemory_checkpoint_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package azeventhubs

import (
"context"
"sort"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -277,6 +278,10 @@ func (cps *testCheckpointStore) ListOwnership(ctx context.Context, fullyQualifie
ownerships = append(ownerships, v)
}

sort.Slice(ownerships, func(i, j int) bool {
return ownerships[i].PartitionID < ownerships[j].PartitionID
})

return ownerships, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,7 @@ matrix:
prefetch: 0
verbose: ""
sleepAfter: "5m"
multibalance:
testTarget: multibalance
rounds: 20
verbose: ""
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,25 @@
"apiVersion": "[variables('apiVersion')]",
"name": "[variables('authorizationName')]",
"location": "[variables('location')]",
"dependsOn": ["[resourceId('Microsoft.EventHub/namespaces', variables('namespaceName'))]"],
"dependsOn": [
"[resourceId('Microsoft.EventHub/namespaces', variables('namespaceName'))]"
],
"properties": {
"rights": ["Listen", "Manage", "Send"]
"rights": [
"Listen",
"Manage",
"Send"
]
}
},
{
"type": "Microsoft.EventHub/namespaces/eventhubs",
"apiVersion": "[variables('apiVersion')]",
"name": "[variables('eventHubNameFull')]",
"location": "[variables('location')]",
"dependsOn": ["[resourceId('Microsoft.EventHub/namespaces', variables('namespaceName'))]"],
"dependsOn": [
"[resourceId('Microsoft.EventHub/namespaces', variables('namespaceName'))]"
],
"properties": {
"messageRetentionInDays": 7,
"partitionCount": 32
Expand All @@ -73,7 +81,9 @@
"apiVersion": "[variables('apiVersion')]",
"name": "[concat(variables('namespaceName'), '/default')]",
"location": "[variables('location')]",
"dependsOn": ["[resourceId('Microsoft.EventHub/namespaces', variables('namespaceName'))]"],
"dependsOn": [
"[resourceId('Microsoft.EventHub/namespaces', variables('namespaceName'))]"
],
"properties": {
"defaultAction": "Deny",
"virtualNetworkRules": [],
Expand Down Expand Up @@ -127,13 +137,15 @@
"name": "[concat('default/', variables('containerName'))]",
"type": "blobServices/containers",
"apiVersion": "[variables('storageApiVersion')]",
"dependsOn": ["[variables('storageAccountName')]"]
"dependsOn": [
"[variables('storageAccountName')]"
]
}
]
},
],
"outputs": {
"EVENTHUB_NAME": {
"EVENTHUB_NAME_STRESS": {
"type": "string",
"value": "[variables('eventHubName')]"
},
Expand Down
2 changes: 2 additions & 0 deletions sdk/messaging/azeventhubs/internal/eh/stress/stress.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ func main() {
fn func(ctx context.Context) error
}{
{name: "batch", fn: tests.BatchStressTester},
{name: "balance", fn: tests.BalanceTester},
{name: "multibalance", fn: tests.MultiBalanceTester},
{name: "processor", fn: tests.ProcessorStressTester},
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ spec:
- >
set -ex;
mkdir -p "$DEBUG_SHARE";
{{if eq .Stress.testTarget "multibalance" }}
/app/stress "{{.Stress.testTarget}}" "-rounds" "{{.Stress.rounds}}" "{{.Stress.verbose}}" 2>&1 | tee -a "${DEBUG_SHARE}/{{ .Stress.Scenario }}-`date +%s`.log";
{{else}}
/app/stress "{{.Stress.testTarget}}" "-rounds" "{{.Stress.rounds}}" "-prefetch" "{{.Stress.prefetch}}" "{{.Stress.verbose}}" "-sleepAfter" "{{.Stress.sleepAfter}}" 2>&1 | tee -a "${DEBUG_SHARE}/{{ .Stress.Scenario }}-`date +%s`.log";
{{end}}
# Pulls the image on pod start, always. We tend to push to the same image and tag over and over again
# when iterating, so this is a must.
imagePullPolicy: Always
Expand Down
Loading

0 comments on commit e63fa24

Please sign in to comment.