Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle Nomad leadership flapping (attempt 2) #6977

Merged
merged 6 commits into from
Jan 28, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 50 additions & 27 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,37 +59,60 @@ var defaultSchedulerConfig = &structs.SchedulerConfiguration{
func (s *Server) monitorLeadership() {
var weAreLeaderCh chan struct{}
var leaderLoop sync.WaitGroup
for {
select {
case isLeader := <-s.leaderCh:
switch {
case isLeader:
if weAreLeaderCh != nil {
s.logger.Error("attempted to start the leader loop while running")
continue
}

weAreLeaderCh = make(chan struct{})
leaderLoop.Add(1)
go func(ch chan struct{}) {
defer leaderLoop.Done()
s.leaderLoop(ch)
}(weAreLeaderCh)
s.logger.Info("cluster leadership acquired")

default:
if weAreLeaderCh == nil {
s.logger.Error("attempted to stop the leader loop while not running")
continue
}
leaderStep := func(isLeader bool) {
switch {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that this has been pulled out of the select, could this switch be a if isLeader with an early return?

case isLeader:
if weAreLeaderCh != nil {
s.logger.Error("attempted to start the leader loop while running")
return
}

s.logger.Debug("shutting down leader loop")
close(weAreLeaderCh)
leaderLoop.Wait()
weAreLeaderCh = nil
s.logger.Info("cluster leadership lost")
weAreLeaderCh = make(chan struct{})
leaderLoop.Add(1)
go func(ch chan struct{}) {
defer leaderLoop.Done()
s.leaderLoop(ch)
}(weAreLeaderCh)
s.logger.Info("cluster leadership acquired")

default:
if weAreLeaderCh == nil {
s.logger.Error("attempted to stop the leader loop while not running")
return
}

s.logger.Debug("shutting down leader loop")
close(weAreLeaderCh)
leaderLoop.Wait()
weAreLeaderCh = nil
s.logger.Info("cluster leadership lost")
}
}

wasLeader := false
for {
select {
case isLeader := <-s.leaderCh:
if wasLeader != isLeader {
wasLeader = isLeader
// normal case where we went through a transition
leaderStep(isLeader)
} else if wasLeader && isLeader {
// Server lost but then gained leadership immediately.
// During this time, this server may have received
// Raft transitions that haven't been applied to the FSM
// yet.
// Ensure that that FSM caught up and eval queues are refreshed
s.logger.Error("cluster leadership flapped, lost and gained leadership immediately. Leadership flaps indicate a cluster wide problems (e.g. networking).")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make this Warn since there's no directly actionable error.


leaderStep(false)
leaderStep(true)
} else {
// Server gained but lost leadership immediately
// before it reacted; nothing to do, move on
s.logger.Error("cluster leadership flapped, gained and lost leadership immediately. Leadership flaps indicate a cluster wide problems (e.g. networking).")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this might be more accurate? I'm not sure gaining-and-losing-before-establishing leadership indicates a clusterwide problem.

Suggested change
s.logger.Error("cluster leadership flapped, gained and lost leadership immediately. Leadership flaps indicate a cluster wide problems (e.g. networking).")
s.logger.Warn("cluster leadership gained and lost. Could indicate network issues, memory paging, or high CPU load.")

AFAICT we only use the defaults for leadership election (1s timeout causes an election, elections have 1s timeout) and don't expose them for customizing. It seems like clusters without demanding scheduler throughput or latency may prefer higher timeouts to reduce elections (and therefore flapping).

If we allowed Raft to be configurable we could at least point users toward those docs when these cases are encountered?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re: configuration - agreed - we should consider porting consul's raft_multipler [1] and study their defaults. In some Consul testing, they increased their default leadership timeout to 5 seconds to account for low powered instances (e.g. t2.medium).

[1] https://www.consul.io/docs/install/performance.html#minimum-server-requirements

}
case <-s.shutdownCh:
return
}
Expand Down
4 changes: 2 additions & 2 deletions nomad/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1234,10 +1234,10 @@ func (s *Server) setupRaft() error {
}
}

// Setup the leader channel
// Setup the leader channel; that keeps the latest leadership alone
leaderCh := make(chan bool, 1)
s.config.RaftConfig.NotifyCh = leaderCh
s.leaderCh = leaderCh
s.leaderCh = dropButLastChannel(leaderCh, s.shutdownCh)

// Setup the Raft store
s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm, log, stable, snap, trans)
Expand Down
65 changes: 65 additions & 0 deletions nomad/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,3 +301,68 @@ func getAlloc(state AllocGetter, allocID string) (*structs.Allocation, error) {

return alloc, nil
}

// dropButLastChannel returns a channel that drops all but last value from sourceCh.
//
// Useful for aggressively consuming sourceCh when intermediate values aren't relevant.
//
// This function propagates values to result quickly and drops intermediate messages
// in best effort basis. Golang scheduler may delay delivery or result in extra
// deliveries.
//
// Consider this function for example:
//
// ```
// src := make(chan bool)
// dst := dropButLastChannel(src, nil)
//
// go func() {
// src <- true
// src <- false
// }()
//
// // v can be `true` here but is very unlikely
// v := <-dst
// ```
//
func dropButLastChannel(sourceCh <-chan bool, shutdownCh <-chan struct{}) chan bool {
// buffer the most recent result
dst := make(chan bool)

go func() {
lv := false

DEQUE_SOURCE:
// wait for first message
select {
case lv = <-sourceCh:
goto ENQUEUE_DST
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary.

Suggested change
goto ENQUEUE_DST

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unnecessary indeed. I'd like to keep though just because I find it easier to see all state machine transitions in goto statements.

case <-shutdownCh:
return
}

ENQUEUE_DST:
// prioritize draining source first dequeue without blocking
for {
select {
case lv = <-sourceCh:
default:
break ENQUEUE_DST
}
}

// attempt to enqueue but keep monitoring source channel
select {
case lv = <-sourceCh:
goto ENQUEUE_DST
case dst <- lv:
// enqueued value; back to dequeing from source
goto DEQUE_SOURCE
case <-shutdownCh:
return
}
}()

return dst

}
72 changes: 72 additions & 0 deletions nomad/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"net"
"reflect"
"testing"
"time"

version "github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/helper/uuid"
Expand Down Expand Up @@ -258,3 +259,74 @@ func TestMaxUint64(t *testing.T) {
t.Fatalf("bad")
}
}

func TestDropButLastChannelDropsValues(t *testing.T) {
sourceCh := make(chan bool)
shutdownCh := make(chan struct{})

dstCh := dropButLastChannel(sourceCh, shutdownCh)

// timeout duration for any channel propagation delay
timeoutDuration := 5 * time.Millisecond

// test that dstCh doesn't emit anything initially
select {
case <-dstCh:
require.Fail(t, "received a message unexpectedly")
case <-time.After(timeoutDuration):
// yay no message - it could have been a default: but
// checking for goroutine effect
}

sourceCh <- false
select {
case v := <-dstCh:
require.False(t, v, "unexpected value from dstCh Ch")
case <-time.After(timeoutDuration):
require.Fail(t, "timed out waiting for source->dstCh propagation")
}

// channel is drained now
select {
case v := <-dstCh:
require.Failf(t, "received a message unexpectedly", "value: %v", v)
case <-time.After(timeoutDuration):
// yay no message - it could have been a default: but
// checking for goroutine effect
}

// now enqueue many messages and ensure only last one is received
// enqueueing should be fast!
sourceCh <- false
sourceCh <- false
sourceCh <- false
sourceCh <- false
sourceCh <- true

// I suspect that dstCh may contain a stale (i.e. `false`) value if golang executes
// this select before the implementation goroutine dequeues last value.
//
// However, never got it to fail in test - so leaving it now to see if it ever fails;
// and if/when test fails, we can learn of how much of an issue it is and adjust
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can fail it pretty trivially if we push the values into the channel concurrently to the test thread, but I'm not sure that tells us anything other than we didn't get a chance to consume everything on the channel. If we pull all the values off, we're fine:

package main

import (
	"fmt"
	"testing"
	"time"

	"github.com/stretchr/testify/assert"
)

func TestDropButLastChannel(t *testing.T) {

	testFunc := func(t *testing.T) {
		t.Parallel()
		shutdownCh := make(chan struct{})

		src := make(chan bool)
		dst := dropButLastChannel(src, shutdownCh)

		timeoutDuration := 1 * time.Millisecond

		go func() {
			src <- false
			src <- false
			src <- false
			src <- false
			src <- false
			src <- false
			src <- true
			src <- false
			src <- true
		}()

		var v bool
	BREAK:
		for {
			select {
			case v = <-dst:
				fmt.Println("ok")
			case <-time.After(timeoutDuration):
				break BREAK
			}
		}

		assert.True(t, v)
		close(shutdownCh)
	}

	for i := 0; i < 1000; i++ {
		t.Run(fmt.Sprintf("test-%d", i), testFunc)
	}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct - if it's running on different goroutine, we have no guarantees of delivery. This feels like another test to add.

Here, I wanted to test that intermediate messages get sent but get dropped when no receive is happening on the channel - so I made the sends happen in the same goroutine. Though, in current form, we still cannot 100% guarantee that the first message we receive is the last sent message, but this hasn't happened in practice yet, hence my comment.

Your test is good to have in that we should check that ultimately, we always send the last message last.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW - seeing your test, I realized I didn't support close source channel (raft doesn't attempt to close notify channel); I updated function to handle close signal by always attempting to deliver last known value, just in case someone adopt function for something else in future.

select {
case v := <-dstCh:
require.True(t, v, "unexpected value from dstCh Ch")
case <-time.After(timeoutDuration):
require.Fail(t, "timed out waiting for source->dstCh propagation")
}

sourceCh <- true
sourceCh <- true
sourceCh <- true
sourceCh <- true
sourceCh <- true
sourceCh <- false
select {
case v := <-dstCh:
require.False(t, v, "unexpected value from dstCh Ch")
case <-time.After(timeoutDuration):
require.Fail(t, "timed out waiting for source->dstCh propagation")
}

close(shutdownCh)
}