Skip to content

Commit

Permalink
Add test demonstrating independent group contexts
Browse files Browse the repository at this point in the history
  • Loading branch information
invisiblefunnel committed Jan 8, 2025
1 parent 6c64f84 commit a529dbf
Showing 1 changed file with 97 additions and 0 deletions.
97 changes: 97 additions & 0 deletions egr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"sort"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -136,6 +137,102 @@ func TestGoPushWait(t *testing.T) {
}
}

func TestIndependentContexts(t *testing.T) {
nItems := 10
nWorkers := 2
queueSize := nItems
counter := &atomic.Int32{}

ctx := context.Background()
a, ctxa := egr.WithContext[int](ctx, queueSize)
b, ctxb := egr.WithContext[int](ctx, queueSize)
c, ctxc := egr.WithContext[int](ctx, queueSize)

for i := 0; i < nItems; i++ {
err := a.Push(ctxa, i)
if err != nil {
t.Errorf("unexpected error returned from Push: %v", err)
}
}

for i := 0; i < nWorkers; i++ {
a.Go(func(queue <-chan int) error {
for item := range queue {
if err := b.Push(ctxb, item); err != nil {
return err
}
}
return nil
})
}

err := a.Wait()
if err != nil {
t.Errorf("unexpected error return from Wait: %v", err)
}

// Only a's context is canceled by Wait,
// we can still add goroutines to b and
// Push to c.
select {
case <-ctxa.Done():
case <-ctxb.Done():
t.Errorf("unexpected context cancellation: %v", ctxb.Err())
case <-ctxc.Done():
t.Errorf("unexpected context cancellation: %v", ctxc.Err())
default:
t.Error("expected a's context to be canceled")
}

for i := 0; i < nWorkers; i++ {
b.Go(func(queue <-chan int) error {
for item := range queue {
if err := c.Push(ctxc, item); err != nil {
return err
}
}
return nil
})
}

err = b.Wait()
if err != nil {
t.Errorf("unexpected error return from Wait: %v", err)
}

select {
case <-ctxb.Done():
case <-ctxc.Done():
t.Errorf("unexpected context cancellation: %v", ctxc.Err())
default:
t.Error("expected b's context to be canceled")
}

for i := 0; i < nWorkers; i++ {
c.Go(func(queue <-chan int) error {
for range queue {
counter.Add(1)
}
return nil
})
}

err = c.Wait()
if err != nil {
t.Errorf("unexpected error return from Wait: %v", err)
}

select {
case <-ctxc.Done():
default:
t.Error("expected c's context to be canceled")
}

if counter.Load() != int32(nItems) {
t.Errorf("expected %d items counted, got %d", nItems, counter.Load())
}
}

// BenchmarkGo measures overhead of spawning goroutines in egr.Group.
func BenchmarkGo(b *testing.B) {
ctx := context.Background()
Expand Down

0 comments on commit a529dbf

Please sign in to comment.