Skip to content

Commit

Permalink
Merge pull request #6 from arg0net:notifier-update
Browse files Browse the repository at this point in the history
add StatefulNotifier.Update method
  • Loading branch information
vgough authored Sep 17, 2024
2 parents 8cb9281 + 45b5ebd commit efb01b4
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 0 deletions.
14 changes: 14 additions & 0 deletions notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,20 @@ func (n *StatefulNotifier[T]) Load() (T, <-chan struct{}) {
return n.value, n.updated
}

// Update will atomically provide the current value to the update function
// and store the result of the function.
// Note that this will call the user's function with a lock held, so
// if the function blocks, then other calls to the notifier will block.
func (n *StatefulNotifier[T]) Update(fn func(T) T) {
n.mu.Lock()
defer n.mu.Unlock()

n.value = fn(n.value)
old := n.updated
n.updated = make(chan struct{})
close(old)
}

// Wait blocks until the given condition function returns true
// or the context is canceled. It returns the value that satisfied the condition.
//
Expand Down
26 changes: 26 additions & 0 deletions notifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"math/rand"
"testing"
"sync"
"time"

"github.com/stretchr/testify/require"
Expand All @@ -29,6 +30,31 @@ func TestNotifier(t *testing.T) {
require.Equal(t, 4, v)
}

func TestNotifierUpdate(t *testing.T) {
sn := collections.NewStatefulNotifier(0)
start := make(chan struct{})

incr := func(in int) int {
return in+1
}
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
select {
case <-start:
}
sn.Update(incr)
}()
}
close(start)

wg.Wait()
v, _ := sn.Load()
require.Equal(t, 10, v)
}

func TestNotifierWait(t *testing.T) {
ctx := context.Background()

Expand Down

0 comments on commit efb01b4

Please sign in to comment.