diff --git a/nomad/leader.go b/nomad/leader.go index ec406a4b4b1..4a85613ee47 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -59,37 +59,59 @@ var defaultSchedulerConfig = &structs.SchedulerConfiguration{ func (s *Server) monitorLeadership() { var weAreLeaderCh chan struct{} var leaderLoop sync.WaitGroup + + leaderStep := func(isLeader bool) { + if isLeader { + if weAreLeaderCh != nil { + s.logger.Error("attempted to start the leader loop while running") + return + } + + weAreLeaderCh = make(chan struct{}) + leaderLoop.Add(1) + go func(ch chan struct{}) { + defer leaderLoop.Done() + s.leaderLoop(ch) + }(weAreLeaderCh) + s.logger.Info("cluster leadership acquired") + return + } + + if weAreLeaderCh == nil { + s.logger.Error("attempted to stop the leader loop while not running") + return + } + + s.logger.Debug("shutting down leader loop") + close(weAreLeaderCh) + leaderLoop.Wait() + weAreLeaderCh = nil + s.logger.Info("cluster leadership lost") + } + + wasLeader := false for { select { case isLeader := <-s.leaderCh: - switch { - case isLeader: - if weAreLeaderCh != nil { - s.logger.Error("attempted to start the leader loop while running") - continue - } - - weAreLeaderCh = make(chan struct{}) - leaderLoop.Add(1) - go func(ch chan struct{}) { - defer leaderLoop.Done() - s.leaderLoop(ch) - }(weAreLeaderCh) - s.logger.Info("cluster leadership acquired") - - default: - if weAreLeaderCh == nil { - s.logger.Error("attempted to stop the leader loop while not running") - continue - } - - s.logger.Debug("shutting down leader loop") - close(weAreLeaderCh) - leaderLoop.Wait() - weAreLeaderCh = nil - s.logger.Info("cluster leadership lost") + if wasLeader != isLeader { + wasLeader = isLeader + // normal case where we went through a transition + leaderStep(isLeader) + } else if wasLeader && isLeader { + // Server lost but then gained leadership immediately. + // During this time, this server may have received + // Raft transitions that haven't been applied to the FSM + // yet. + // Ensure that that FSM caught up and eval queues are refreshed + s.logger.Warn("cluster leadership lost and gained leadership immediately. Could indicate network issues, memory paging, or high CPU load.") + + leaderStep(false) + leaderStep(true) + } else { + // Server gained but lost leadership immediately + // before it reacted; nothing to do, move on + s.logger.Warn("cluster leadership gained and lost leadership immediately. Could indicate network issues, memory paging, or high CPU load.") } - case <-s.shutdownCh: return } diff --git a/nomad/server.go b/nomad/server.go index a902beb41ac..6b7765edfa1 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -1234,10 +1234,10 @@ func (s *Server) setupRaft() error { } } - // Setup the leader channel + // Setup the leader channel; that keeps the latest leadership alone leaderCh := make(chan bool, 1) s.config.RaftConfig.NotifyCh = leaderCh - s.leaderCh = leaderCh + s.leaderCh = dropButLastChannel(leaderCh, s.shutdownCh) // Setup the Raft store s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm, log, stable, snap, trans) diff --git a/nomad/util.go b/nomad/util.go index e2772a73c70..8b8594661df 100644 --- a/nomad/util.go +++ b/nomad/util.go @@ -301,3 +301,96 @@ func getAlloc(state AllocGetter, allocID string) (*structs.Allocation, error) { return alloc, nil } + +// dropButLastChannel returns a channel that drops all but last value from sourceCh. +// +// Useful for aggressively consuming sourceCh when intermediate values aren't relevant. +// +// This function propagates values to result quickly and drops intermediate messages +// in best effort basis. Golang scheduler may delay delivery or result in extra +// deliveries. +// +// Consider this function for example: +// +// ``` +// src := make(chan bool) +// dst := dropButLastChannel(src, nil) +// +// go func() { +// src <- true +// src <- false +// }() +// +// // v can be `true` here but is very unlikely +// v := <-dst +// ``` +// +func dropButLastChannel(sourceCh <-chan bool, shutdownCh <-chan struct{}) chan bool { + // buffer the most recent result + dst := make(chan bool) + + go func() { + // last value received + lv := false + // ok source was closed + ok := false + // received message since last delivery to destination + messageReceived := false + + DEQUE_SOURCE: + // wait for first message + select { + case lv, ok = <-sourceCh: + if !ok { + goto SOURCE_CLOSED + } + messageReceived = true + goto ENQUEUE_DST + case <-shutdownCh: + return + } + + ENQUEUE_DST: + // prioritize draining source first dequeue without blocking + for { + select { + case lv, ok = <-sourceCh: + if !ok { + goto SOURCE_CLOSED + } + messageReceived = true + default: + break ENQUEUE_DST + } + } + + // attempt to enqueue but keep monitoring source channel + select { + case lv, ok = <-sourceCh: + if !ok { + goto SOURCE_CLOSED + } + messageReceived = true + goto ENQUEUE_DST + case dst <- lv: + messageReceived = false + // enqueued value; back to dequeing from source + goto DEQUE_SOURCE + case <-shutdownCh: + return + } + + SOURCE_CLOSED: + if messageReceived { + select { + case dst <- lv: + case <-shutdownCh: + return + } + } + close(dst) + }() + + return dst + +} diff --git a/nomad/util_test.go b/nomad/util_test.go index b1df2e52344..bb6f68620d0 100644 --- a/nomad/util_test.go +++ b/nomad/util_test.go @@ -4,6 +4,7 @@ import ( "net" "reflect" "testing" + "time" version "github.com/hashicorp/go-version" "github.com/hashicorp/nomad/helper/uuid" @@ -258,3 +259,176 @@ func TestMaxUint64(t *testing.T) { t.Fatalf("bad") } } + +func TestDropButLastChannelDropsValues(t *testing.T) { + sourceCh := make(chan bool) + shutdownCh := make(chan struct{}) + defer close(shutdownCh) + + dstCh := dropButLastChannel(sourceCh, shutdownCh) + + // timeout duration for any channel propagation delay + timeoutDuration := 5 * time.Millisecond + + // test that dstCh doesn't emit anything initially + select { + case <-dstCh: + require.Fail(t, "received a message unexpectedly") + case <-time.After(timeoutDuration): + // yay no message - it could have been a default: but + // checking for goroutine effect + } + + sourceCh <- false + select { + case v := <-dstCh: + require.False(t, v, "unexpected value from dstCh Ch") + case <-time.After(timeoutDuration): + require.Fail(t, "timed out waiting for source->dstCh propagation") + } + + // channel is drained now + select { + case v := <-dstCh: + require.Failf(t, "received a message unexpectedly", "value: %v", v) + case <-time.After(timeoutDuration): + // yay no message - it could have been a default: but + // checking for goroutine effect + } + + // now enqueue many messages and ensure only last one is received + // enqueueing should be fast! + sourceCh <- false + sourceCh <- false + sourceCh <- false + sourceCh <- false + sourceCh <- true + + // I suspect that dstCh may contain a stale (i.e. `false`) value if golang executes + // this select before the implementation goroutine dequeues last value. + // + // However, never got it to fail in test - so leaving it now to see if it ever fails; + // and if/when test fails, we can learn of how much of an issue it is and adjust + select { + case v := <-dstCh: + require.True(t, v, "unexpected value from dstCh Ch") + case <-time.After(timeoutDuration): + require.Fail(t, "timed out waiting for source->dstCh propagation") + } + + sourceCh <- true + sourceCh <- true + sourceCh <- true + sourceCh <- true + sourceCh <- true + sourceCh <- false + select { + case v := <-dstCh: + require.False(t, v, "unexpected value from dstCh Ch") + case <-time.After(timeoutDuration): + require.Fail(t, "timed out waiting for source->dstCh propagation") + } +} + +// TestDropButLastChannel_DeliversMessages asserts that last +// message is always delivered, some messages are dropped but never +// introduce new messages. +// On tight loop, receivers may get some intermediary messages. +func TestDropButLastChannel_DeliversMessages(t *testing.T) { + sourceCh := make(chan bool) + shutdownCh := make(chan struct{}) + defer close(shutdownCh) + + dstCh := dropButLastChannel(sourceCh, shutdownCh) + + // timeout duration for any channel propagation delay + timeoutDuration := 5 * time.Millisecond + + sentMessages := 100 + go func() { + for i := 0; i < sentMessages-1; i++ { + sourceCh <- true + } + sourceCh <- false + }() + + receivedTrue, receivedFalse := 0, 0 + var lastReceived *bool + +RECEIVE_LOOP: + for { + select { + case v := <-dstCh: + lastReceived = &v + if v { + receivedTrue++ + } else { + receivedFalse++ + } + + case <-time.After(timeoutDuration): + break RECEIVE_LOOP + } + } + + t.Logf("receiver got %v out %v true messages, and %v out of %v false messages", + receivedTrue, sentMessages-1, receivedFalse, 1) + + require.NotNil(t, lastReceived) + require.False(t, *lastReceived) + require.Equal(t, 1, receivedFalse) + require.LessOrEqual(t, receivedTrue, sentMessages-1) +} + +// TestDropButLastChannel_DeliversMessages_Close asserts that last +// message is always delivered, some messages are dropped but never +// introduce new messages, even with a closed signal. +func TestDropButLastChannel_DeliversMessages_Close(t *testing.T) { + sourceCh := make(chan bool) + shutdownCh := make(chan struct{}) + defer close(shutdownCh) + + dstCh := dropButLastChannel(sourceCh, shutdownCh) + + // timeout duration for any channel propagation delay + timeoutDuration := 5 * time.Millisecond + + sentMessages := 100 + go func() { + for i := 0; i < sentMessages-1; i++ { + sourceCh <- true + } + sourceCh <- false + close(sourceCh) + }() + + receivedTrue, receivedFalse := 0, 0 + var lastReceived *bool + +RECEIVE_LOOP: + for { + select { + case v, ok := <-dstCh: + if !ok { + break RECEIVE_LOOP + } + lastReceived = &v + if v { + receivedTrue++ + } else { + receivedFalse++ + } + + case <-time.After(timeoutDuration): + require.Fail(t, "timed out while waiting for messages") + } + } + + t.Logf("receiver got %v out %v true messages, and %v out of %v false messages", + receivedTrue, sentMessages-1, receivedFalse, 1) + + require.NotNil(t, lastReceived) + require.False(t, *lastReceived) + require.Equal(t, 1, receivedFalse) + require.LessOrEqual(t, receivedTrue, sentMessages-1) +}