Skip to content

Commit

Permalink
handle channel close signal
Browse files Browse the repository at this point in the history
Always deliver last value then send close signal.
  • Loading branch information
Mahmood Ali committed Jan 28, 2020
1 parent 0912400 commit 97f20bd
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 3 deletions.
34 changes: 31 additions & 3 deletions nomad/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,12 +330,21 @@ func dropButLastChannel(sourceCh <-chan bool, shutdownCh <-chan struct{}) chan b
dst := make(chan bool)

go func() {
// last value received
lv := false
// ok source was closed
ok := false
// received message since last delivery to destination
messageReceived := false

DEQUE_SOURCE:
// wait for first message
select {
case lv = <-sourceCh:
case lv, ok = <-sourceCh:
if !ok {
goto SOURCE_CLOSED
}
messageReceived = true
goto ENQUEUE_DST
case <-shutdownCh:
return
Expand All @@ -345,22 +354,41 @@ func dropButLastChannel(sourceCh <-chan bool, shutdownCh <-chan struct{}) chan b
// prioritize draining source first dequeue without blocking
for {
select {
case lv = <-sourceCh:
case lv, ok = <-sourceCh:
if !ok {
goto SOURCE_CLOSED
}
messageReceived = true
default:
break ENQUEUE_DST
}
}

// attempt to enqueue but keep monitoring source channel
select {
case lv = <-sourceCh:
case lv, ok = <-sourceCh:
if !ok {
goto SOURCE_CLOSED
}
messageReceived = true
goto ENQUEUE_DST
case dst <- lv:
messageReceived = false
// enqueued value; back to dequeing from source
goto DEQUE_SOURCE
case <-shutdownCh:
return
}

SOURCE_CLOSED:
if messageReceived {
select {
case dst <- lv:
case <-shutdownCh:
return
}
}
close(dst)
}()

return dst
Expand Down
52 changes: 52 additions & 0 deletions nomad/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,3 +379,55 @@ RECEIVE_LOOP:
require.Equal(t, 1, receivedFalse)
require.LessOrEqual(t, receivedTrue, sentMessages-1)
}

// TestDropButLastChannel_DeliversMessages_Close asserts that last
// message is always delivered, some messages are dropped but never
// introduce new messages, even with a closed signal.
func TestDropButLastChannel_DeliversMessages_Close(t *testing.T) {
sourceCh := make(chan bool)
shutdownCh := make(chan struct{})

dstCh := dropButLastChannel(sourceCh, shutdownCh)

// timeout duration for any channel propagation delay
timeoutDuration := 5 * time.Millisecond

sentMessages := 100
go func() {
for i := 0; i < sentMessages-1; i++ {
sourceCh <- true
}
sourceCh <- false
close(sourceCh)
}()

receivedTrue, receivedFalse := 0, 0
var lastReceived *bool

RECEIVE_LOOP:
for {
select {
case v, ok := <-dstCh:
if !ok {
break RECEIVE_LOOP
}
lastReceived = &v
if v {
receivedTrue++
} else {
receivedFalse++
}

case <-time.After(timeoutDuration):
require.Fail(t, "timed out while waiting for messages")
}
}

t.Logf("receiver got %v out %v true messages, and %v out of %v false messages",
receivedTrue, sentMessages-1, receivedFalse, 1)

require.NotNil(t, lastReceived)
require.False(t, *lastReceived)
require.Equal(t, 1, receivedFalse)
require.LessOrEqual(t, receivedTrue, sentMessages-1)
}

0 comments on commit 97f20bd

Please sign in to comment.