Skip to content

Commit

Permalink
include test and address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Mahmood Ali committed Jan 28, 2020
1 parent 2810bf3 commit 0912400
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 13 deletions.
25 changes: 12 additions & 13 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ func (s *Server) monitorLeadership() {
var leaderLoop sync.WaitGroup

leaderStep := func(isLeader bool) {
switch {
case isLeader:
if isLeader {
if weAreLeaderCh != nil {
s.logger.Error("attempted to start the leader loop while running")
return
Expand All @@ -75,19 +74,19 @@ func (s *Server) monitorLeadership() {
s.leaderLoop(ch)
}(weAreLeaderCh)
s.logger.Info("cluster leadership acquired")
return
}

default:
if weAreLeaderCh == nil {
s.logger.Error("attempted to stop the leader loop while not running")
return
}

s.logger.Debug("shutting down leader loop")
close(weAreLeaderCh)
leaderLoop.Wait()
weAreLeaderCh = nil
s.logger.Info("cluster leadership lost")
if weAreLeaderCh == nil {
s.logger.Error("attempted to stop the leader loop while not running")
return
}

s.logger.Debug("shutting down leader loop")
close(weAreLeaderCh)
leaderLoop.Wait()
weAreLeaderCh = nil
s.logger.Info("cluster leadership lost")
}

wasLeader := false
Expand Down
49 changes: 49 additions & 0 deletions nomad/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,3 +330,52 @@ func TestDropButLastChannelDropsValues(t *testing.T) {

close(shutdownCh)
}

// TestDropButLastChannel_DeliversMessages asserts that last
// message is always delivered, some messages are dropped but never
// introduce new messages.
// On tight loop, receivers may get some intermediary messages.
func TestDropButLastChannel_DeliversMessages(t *testing.T) {
sourceCh := make(chan bool)
shutdownCh := make(chan struct{})

dstCh := dropButLastChannel(sourceCh, shutdownCh)

// timeout duration for any channel propagation delay
timeoutDuration := 5 * time.Millisecond

sentMessages := 100
go func() {
for i := 0; i < sentMessages-1; i++ {
sourceCh <- true
}
sourceCh <- false
}()

receivedTrue, receivedFalse := 0, 0
var lastReceived *bool

RECEIVE_LOOP:
for {
select {
case v := <-dstCh:
lastReceived = &v
if v {
receivedTrue++
} else {
receivedFalse++
}

case <-time.After(timeoutDuration):
break RECEIVE_LOOP
}
}

t.Logf("receiver got %v out %v true messages, and %v out of %v false messages",
receivedTrue, sentMessages-1, receivedFalse, 1)

require.NotNil(t, lastReceived)
require.False(t, *lastReceived)
require.Equal(t, 1, receivedFalse)
require.LessOrEqual(t, receivedTrue, sentMessages-1)
}

0 comments on commit 0912400

Please sign in to comment.