Skip to content

Commit

Permalink
feat: add safe_channel pkg
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-ywliu committed Oct 16, 2024
1 parent 2602d00 commit 5698cc6
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 0 deletions.
32 changes: 32 additions & 0 deletions safe_channel/safe_channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package safe_channel

import (
"sync/atomic"
)

type SafeCh[T any] struct {
closed atomic.Bool
ch chan T
}

func NewSafeCh[T any](size int) *SafeCh[T] {
return &SafeCh[T]{
ch: make(chan T, size),
}
}

func (c *SafeCh[T]) Send(e T) {
if c.closed.CompareAndSwap(false, false) {
c.ch <- e
}
}

func (c *SafeCh[T]) GetRcvChan() <-chan T {
return c.ch
}

func (c *SafeCh[T]) Close() {
if c.closed.CompareAndSwap(false, true) {
close(c.ch)
}
}
42 changes: 42 additions & 0 deletions safe_channel/safe_channel_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package safe_channel

import (
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"
)

// Simulate 1 receiver and N(N=2) senders situation
func TestSafeChannel(t *testing.T) {
sCh := NewSafeCh[int](1)
wg := sync.WaitGroup{}

// Two senders
wg.Add(2)
for i := 0; i < 2; i++ {
go func(i int) {
if i == 0 {
// Case: send after sCh closed
time.Sleep(1 * time.Second)

require.Equal(t, sCh.closed.Load(), true)
sCh.Send(1) // No panic
require.Equal(t, sCh.closed.Load(), true)
} else {
// Case: send success
sCh.Send(1)
require.Equal(t, sCh.closed.Load(), false)
}
wg.Done()
}(i)
}

// One receiver
<-sCh.GetRcvChan()
sCh.Close()
require.Equal(t, sCh.closed.Load(), true)

wg.Wait()
}

0 comments on commit 5698cc6

Please sign in to comment.