Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Heartbeat improvements and handling failures during establishing leadership #3890

Merged
merged 9 commits into from
Mar 12, 2018
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1131,6 +1131,7 @@ func (c *Client) registerNode() error {
// Update the node status to ready after we register.
c.configLock.Lock()
node.Status = structs.NodeStatusReady
c.config.Node.Status = structs.NodeStatusReady
c.configLock.Unlock()

c.logger.Printf("[INFO] client: node registration complete")
Expand Down
8 changes: 8 additions & 0 deletions nomad/deploymentwatcher/deployment_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,14 @@ func (w *deploymentWatcher) createEvalBatched(forIndex uint64) {
w.outstandingBatch = true

time.AfterFunc(perJobEvalBatchPeriod, func() {
// If the timer has been created and then we shutdown, we need to no-op
// the evaluation creation.
select {
case <-w.ctx.Done():
return
default:
}

// Create the eval
evalCreateIndex, err := w.createEvaluation(w.getEval())
if err != nil {
Expand Down
28 changes: 28 additions & 0 deletions nomad/heartbeat.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nomad

import (
"errors"
"time"

"github.com/armon/go-metrics"
Expand All @@ -9,6 +10,18 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)

const (
// heartbeatNotLeader is the error string returned when the heartbeat request
// couldn't be completed since the server is not the leader.
heartbeatNotLeader = "failed to reset heartbeat since server is not leader"
)

var (
// heartbeatNotLeaderErr is the error returned when the heartbeat request
// couldn't be completed since the server is not the leader.
heartbeatNotLeaderErr = errors.New(heartbeatNotLeader)
)

// initializeHeartbeatTimers is used when a leader is newly elected to create
// a new map to track heartbeat expiration and to reset all the timers from
// the previously known set of timers.
Expand Down Expand Up @@ -50,6 +63,13 @@ func (s *Server) resetHeartbeatTimer(id string) (time.Duration, error) {
s.heartbeatTimersLock.Lock()
defer s.heartbeatTimersLock.Unlock()

// Do not create a timer for the node since we are not the leader. This
// check avoids the race in which leadership is lost but a timer is created
// on this server since it was servicing an RPC during a leadership loss.
if !s.IsLeader() {
return 0, heartbeatNotLeaderErr
}

// Compute the target TTL value
n := len(s.heartbeatTimers)
ttl := lib.RateScaledInterval(s.config.MaxHeartbeatsPerSecond, s.config.MinHeartbeatTTL, n)
Expand Down Expand Up @@ -89,6 +109,14 @@ func (s *Server) invalidateHeartbeat(id string) {
s.heartbeatTimersLock.Lock()
delete(s.heartbeatTimers, id)
s.heartbeatTimersLock.Unlock()

// Do not invalidate the node since we are not the leader. This check avoids
// the race in which leadership is lost but a timer is created on this
// server since it was servicing an RPC during a leadership loss.
if !s.IsLeader() {
return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we log a message here, just to indicate that this happened?

}

s.logger.Printf("[WARN] nomad.heartbeat: node '%s' TTL expired", id)

// Make a request to update the node status
Expand Down
40 changes: 30 additions & 10 deletions nomad/heartbeat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ import (
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
)

func TestInitializeHeartbeatTimers(t *testing.T) {
func TestHeartbeat_InitializeHeartbeatTimers(t *testing.T) {
t.Parallel()
s1 := testServer(t, nil)
defer s1.Shutdown()
Expand All @@ -38,7 +39,7 @@ func TestInitializeHeartbeatTimers(t *testing.T) {
}
}

func TestResetHeartbeatTimer(t *testing.T) {
func TestHeartbeat_ResetHeartbeatTimer(t *testing.T) {
t.Parallel()
s1 := testServer(t, nil)
defer s1.Shutdown()
Expand All @@ -60,7 +61,24 @@ func TestResetHeartbeatTimer(t *testing.T) {
}
}

func TestResetHeartbeatTimerLocked(t *testing.T) {
func TestHeartbeat_ResetHeartbeatTimer_Nonleader(t *testing.T) {
t.Parallel()
require := require.New(t)
s1 := testServer(t, func(c *Config) {
c.BootstrapExpect = 3 // Won't become leader
c.DevDisableBootstrap = true
})
defer s1.Shutdown()

require.False(s1.IsLeader())

// Create a new timer
_, err := s1.resetHeartbeatTimer("test")
require.NotNil(err)
require.EqualError(err, heartbeatNotLeader)
}

func TestHeartbeat_ResetHeartbeatTimerLocked(t *testing.T) {
t.Parallel()
s1 := testServer(t, nil)
defer s1.Shutdown()
Expand All @@ -81,7 +99,7 @@ func TestResetHeartbeatTimerLocked(t *testing.T) {
}
}

func TestResetHeartbeatTimerLocked_Renew(t *testing.T) {
func TestHeartbeat_ResetHeartbeatTimerLocked_Renew(t *testing.T) {
t.Parallel()
s1 := testServer(t, nil)
defer s1.Shutdown()
Expand Down Expand Up @@ -120,7 +138,7 @@ func TestResetHeartbeatTimerLocked_Renew(t *testing.T) {
t.Fatalf("should have expired")
}

func TestInvalidateHeartbeat(t *testing.T) {
func TestHeartbeat_InvalidateHeartbeat(t *testing.T) {
t.Parallel()
s1 := testServer(t, nil)
defer s1.Shutdown()
Expand Down Expand Up @@ -148,7 +166,7 @@ func TestInvalidateHeartbeat(t *testing.T) {
}
}

func TestClearHeartbeatTimer(t *testing.T) {
func TestHeartbeat_ClearHeartbeatTimer(t *testing.T) {
t.Parallel()
s1 := testServer(t, nil)
defer s1.Shutdown()
Expand All @@ -168,7 +186,7 @@ func TestClearHeartbeatTimer(t *testing.T) {
}
}

func TestClearAllHeartbeatTimers(t *testing.T) {
func TestHeartbeat_ClearAllHeartbeatTimers(t *testing.T) {
t.Parallel()
s1 := testServer(t, nil)
defer s1.Shutdown()
Expand All @@ -190,7 +208,7 @@ func TestClearAllHeartbeatTimers(t *testing.T) {
}
}

func TestServer_HeartbeatTTL_Failover(t *testing.T) {
func TestHeartbeat_Server_HeartbeatTTL_Failover(t *testing.T) {
t.Parallel()
s1 := testServer(t, nil)
defer s1.Shutdown()
Expand Down Expand Up @@ -253,9 +271,11 @@ func TestServer_HeartbeatTTL_Failover(t *testing.T) {
leader.Shutdown()

// heartbeatTimers should be cleared on leader shutdown
if len(leader.heartbeatTimers) != 0 {
testutil.WaitForResult(func() (bool, error) {
return len(leader.heartbeatTimers) == 0, nil
}, func(err error) {
t.Fatalf("heartbeat timers should be empty on the shutdown leader")
}
})

// Find the new leader
testutil.WaitForResult(func() (bool, error) {
Expand Down
12 changes: 12 additions & 0 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,16 @@ RECONCILE:
if !establishedLeader {
if err := s.establishLeadership(stopCh); err != nil {
s.logger.Printf("[ERR] nomad: failed to establish leadership: %v", err)

// Immediately revoke leadership since we didn't successfully
// establish leadership.
if err := s.revokeLeadership(); err != nil {
s.logger.Printf("[ERR] nomad: failed to revoke leadership: %v", err)
}

goto WAIT
}

establishedLeader = true
defer func() {
if err := s.revokeLeadership(); err != nil {
Expand Down Expand Up @@ -157,6 +165,8 @@ WAIT:
// previously inflight transactions have been committed and that our
// state is up-to-date.
func (s *Server) establishLeadership(stopCh chan struct{}) error {
defer metrics.MeasureSince([]string{"nomad", "leader", "establish_leadership"}, time.Now())

// Generate a leader ACL token. This will allow the leader to issue work
// that requires a valid ACL token.
s.setLeaderAcl(uuid.Generate())
Expand Down Expand Up @@ -639,6 +649,8 @@ func (s *Server) publishJobSummaryMetrics(stopCh chan struct{}) {
// revokeLeadership is invoked once we step down as leader.
// This is used to cleanup any state that may be specific to a leader.
func (s *Server) revokeLeadership() error {
defer metrics.MeasureSince([]string{"nomad", "leader", "revoke_leadership"}, time.Now())

// Clear the leader token since we are no longer the leader.
s.setLeaderAcl("")

Expand Down
17 changes: 17 additions & 0 deletions nomad/leader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestLeader_LeftServer(t *testing.T) {
Expand Down Expand Up @@ -978,3 +979,19 @@ func TestLeader_RollRaftServer(t *testing.T) {
})
}
}

func TestLeader_RevokeLeadership_MultipleTimes(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)

testutil.WaitForResult(func() (bool, error) {
return s1.evalBroker.Enabled(), nil
}, func(err error) {
t.Fatalf("should have finished establish leader loop")
})

require.Nil(t, s1.revokeLeadership())
require.Nil(t, s1.revokeLeadership())
require.Nil(t, s1.revokeLeadership())
}