Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
richardpark-msft authored Jan 5, 2024
1 parent 0185750 commit f99199e
Show file tree
Hide file tree
Showing 8 changed files with 249 additions and 198 deletions.
5 changes: 5 additions & 0 deletions sdk/messaging/azeventhubs/inmemory_checkpoint_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package azeventhubs

import (
"context"
"sort"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -277,6 +278,10 @@ func (cps *testCheckpointStore) ListOwnership(ctx context.Context, fullyQualifie
ownerships = append(ownerships, v)
}

sort.Slice(ownerships, func(i, j int) bool {
return ownerships[i].PartitionID < ownerships[j].PartitionID
})

return ownerships, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
"errors"
"flag"
"fmt"
golog "log"
"os"
"strings"
"sync"
"time"

Expand All @@ -31,13 +33,20 @@ func BalanceTester(ctx context.Context) error {

numProcessors := fs.Int("processors", 32, "The # of processor instances to run")
strategy := fs.String("strategy", string(azeventhubs.ProcessorStrategyBalanced), "The partition acquisition strategy to use (balanced, greedy)")
enableVerboseLoggingFn := addVerboseLoggingFlag(fs)

if err := fs.Parse(os.Args[2:]); err != nil {
return err
}

enableVerboseLoggingFn()
log.SetEvents(EventBalanceTest, azeventhubs.EventConsumer)
log.SetListener(func(e log.Event, s string) {
if e == azeventhubs.EventConsumer &&
!strings.Contains(s, "Asked for") {
return
}

golog.Printf("[%s] %s", e, s)
})

return balanceTesterImpl(ctx, *numProcessors, azeventhubs.ProcessorStrategy(*strategy))
}
Expand Down Expand Up @@ -99,7 +108,7 @@ func (bt *balanceTester) Run(ctx context.Context) error {
wg := sync.WaitGroup{}
failuresChan := make(chan error, bt.numProcessors)

testCtx, cancelTest := context.WithTimeout(context.Background(), 5*time.Minute)
testCtx, cancelTest := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancelTest()

mu := sync.Mutex{}
Expand All @@ -111,6 +120,8 @@ func (bt *balanceTester) Run(ctx context.Context) error {
var firstBalance time.Duration

Loop:
// poll every 5 seconds to see if the checkpoint store is "balanced" (all owners
// own a fair-share of the partitions).
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -204,6 +215,9 @@ func (bt *balanceTester) Run(ctx context.Context) error {
}

func (bt *balanceTester) process(ctx context.Context, name string) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

client, err := azeventhubs.NewConsumerClientFromConnectionString(bt.ConnectionString, bt.HubName, azeventhubs.DefaultConsumerGroup, nil)

if err != nil {
Expand Down Expand Up @@ -240,7 +254,9 @@ func (bt *balanceTester) process(ctx context.Context, name string) error {
return err
}

ch := make(chan struct{})
go func() {
defer close(ch)
for {
pc := processor.NextPartitionClient(ctx)

Expand All @@ -252,30 +268,30 @@ func (bt *balanceTester) process(ctx context.Context, name string) error {
}
}()

return processor.Run(ctx)
err = processor.Run(ctx)
cancel()
<-ch

return err
}

func (bt *balanceTester) keepAlive(ctx context.Context, pc *azeventhubs.ProcessorPartitionClient) {
ReceiveLoop:
defer func() {
_ = pc.Close(context.Background())
}()

for {
_, err := pc.ReceiveEvents(ctx, 1, nil)

var eventHubErr *azeventhubs.Error

switch {
case errors.Is(err, context.Canceled):
// test is over, we'll see how everything shook out.
break ReceiveLoop
case errors.As(err, &eventHubErr) && eventHubErr.Code == azeventhubs.ErrorCodeOwnershipLost:
// we've swapped ownership with _someone_. Record it in the map, but be careful of potential ordering
// and only delete if we're the one in the map!
break ReceiveLoop
if _, err := pc.ReceiveEvents(ctx, 1, nil); err != nil {
break
}
}
}

type unbalancedError error

// checkBalance queries the checkpoint store.
// It returns `nil` if no error occurred and the checkpoint store was balanced.
// If the checkpoint store is NOT balanced it returns an unbalancedError
func (bt *balanceTester) checkBalance(ctx context.Context) error {
blobClient, err := azblob.NewClientFromConnectionString(bt.StorageConnectionString, nil)

Expand Down Expand Up @@ -373,7 +389,7 @@ type stats struct {
}

func (s *stats) String() string {
jsonBytes, err := json.MarshalIndent(s, " ", " ")
jsonBytes, err := json.Marshal(s)

if err != nil {
panic(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func getBatchTesterParams(args []string) (batchTesterParams, error) {
fs.IntVar(&params.paddingBytes, "padding", 1024, "Extra number of bytes to add into each message body")
fs.StringVar(&params.partitionID, "partition", "0", "Partition ID to send and receive events to")
fs.IntVar(&params.maxDeadlineExceeded, "maxtimeouts", 10, "Number of consecutive receive timeouts allowed before quitting")
enableVerboseLoggingFn := addVerboseLoggingFlag(fs)
enableVerboseLoggingFn := addVerboseLoggingFlag(fs, nil)

sleepAfterFn := addSleepAfterFlag(fs)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ package tests
import (
"context"
"flag"
"fmt"
golog "log"
"os"
"strings"

"github.com/Azure/azure-sdk-for-go/sdk/internal/log"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
Expand All @@ -15,42 +18,62 @@ import (
func MultiBalanceTester(ctx context.Context) error {
fs := flag.NewFlagSet("", flag.ContinueOnError)

enableVerboseLoggingFn := addVerboseLoggingFlag(fs)
rounds := fs.Int("rounds", 1, "Number of rounds to run")

if err := fs.Parse(os.Args[2:]); err != nil {
return err
}

enableVerboseLoggingFn()
ch := make(chan string, 10000)

log.SetEvents(EventBalanceTest, azeventhubs.EventConsumer)
log.SetListener(func(e log.Event, s string) {
if e == azeventhubs.EventConsumer &&
!strings.Contains(s, "Asked for") {
return
}

ch <- fmt.Sprintf("[%s] %s", e, s)
})

go func() {
for {
select {
case s := <-ch:
golog.Println(s)
case <-ctx.Done():
break
}
}
}()

for i := 0; i < *rounds; i++ {
testData := []struct {
Processors int
Strategy azeventhubs.ProcessorStrategy
}{
// {32, azeventhubs.ProcessorStrategyGreedy},
// {31, azeventhubs.ProcessorStrategyGreedy},
// {16, azeventhubs.ProcessorStrategyGreedy},
// {5, azeventhubs.ProcessorStrategyGreedy},
// {1, azeventhubs.ProcessorStrategyGreedy},

// {32, azeventhubs.ProcessorStrategyBalanced},
// {31, azeventhubs.ProcessorStrategyBalanced},
// {16, azeventhubs.ProcessorStrategyBalanced},
{32, azeventhubs.ProcessorStrategyGreedy},
{31, azeventhubs.ProcessorStrategyGreedy},
{16, azeventhubs.ProcessorStrategyGreedy},
{5, azeventhubs.ProcessorStrategyGreedy},
{1, azeventhubs.ProcessorStrategyGreedy},

{32, azeventhubs.ProcessorStrategyBalanced},
{31, azeventhubs.ProcessorStrategyBalanced},
{16, azeventhubs.ProcessorStrategyBalanced},
{5, azeventhubs.ProcessorStrategyBalanced},
// {1, azeventhubs.ProcessorStrategyBalanced},
{1, azeventhubs.ProcessorStrategyBalanced},
}

for _, td := range testData {
log.Writef(EventBalanceTest, "----- BEGIN[%d]: %s, %d processors -----", *rounds, td.Strategy, td.Processors)
log.Writef(EventBalanceTest, "----- BEGIN[%d]: %s, %d processors -----", i, td.Strategy, td.Processors)

if err := balanceTesterImpl(ctx, td.Processors, td.Strategy); err != nil {
log.Writef(EventBalanceTest, "----- END[%d]: FAIL: %s, %d processors, %s -----", *rounds, td.Strategy, td.Processors, err)
log.Writef(EventBalanceTest, "----- END[%d]: FAIL: %s, %d processors, %s -----", i, td.Strategy, td.Processors, err)
return err
}

log.Writef(EventBalanceTest, "----- END[%d]: %s, %d processors -----", *rounds, td.Strategy, td.Processors)
log.Writef(EventBalanceTest, "----- END[%d]: %s, %d processors -----", i, td.Strategy, td.Processors)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func newProcessorStressTest(args []string) (*processorStressTest, error) {
eventsPerRound := fs.Int("send", 5000, "Number of events to send per round")
rounds := fs.Int64("rounds", 100, "Number of rounds. -1 means math.MaxInt64")
prefetch := fs.Int("prefetch", 0, "Number of events to set for the prefetch. Negative numbers disable prefetch altogether. 0 uses the default for the package.")
enableVerboseLoggingFn := addVerboseLoggingFlag(fs)
enableVerboseLoggingFn := addVerboseLoggingFlag(fs, nil)
sleepAfterFn := addSleepAfterFlag(fs)

if err := fs.Parse(args); err != nil {
Expand Down
26 changes: 14 additions & 12 deletions sdk/messaging/azeventhubs/internal/eh/stress/tests/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,28 +397,30 @@ func addSleepAfterFlag(fs *flag.FlagSet) func() {
}
}

func addVerboseLoggingFlag(fs *flag.FlagSet) func() {
func addVerboseLoggingFlag(fs *flag.FlagSet, customLogFn func(verbose string, e azlog.Event, s string)) func() {
verbose := fs.String("v", "", "Enable verbose SDK logging. Valid values are test or sdk or all")

logFn := func(e azlog.Event, s string) {
log.Printf("[%s] %s", e, s)
}

if customLogFn != nil {
logFn = func(e azlog.Event, s string) {
customLogFn(*verbose, e, s)
}
}

return func() {
switch *verbose {
case "":
case "test":
azlog.SetEvents(EventBalanceTest)

azlog.SetListener(func(e azlog.Event, s string) {
log.Printf("[%s] %s", e, s)
})
azlog.SetListener(logFn)
case "sdk":
azlog.SetEvents(EventBalanceTest, azeventhubs.EventConsumer, azeventhubs.EventProducer)

azlog.SetListener(func(e azlog.Event, s string) {
log.Printf("[%s] %s", e, s)
})
azlog.SetListener(logFn)
case "all":
azlog.SetListener(func(e azlog.Event, s string) {
log.Printf("[%s] %s", e, s)
})
azlog.SetListener(logFn)
default:
fmt.Printf("%s is not a valid logging value. Valid values are test or sdk or all", *verbose)
}
Expand Down
Loading

0 comments on commit f99199e

Please sign in to comment.