Skip to content

Commit

Permalink
Merge pull request #3890 from hashicorp/b-heartbeat
Browse files Browse the repository at this point in the history
Heartbeat improvements and handling failures during establishing leadership
  • Loading branch information
dadgar authored Mar 12, 2018
2 parents bc014b9 + a013782 commit 42e9fe1
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 11 deletions.
1 change: 1 addition & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1171,6 +1171,7 @@ func (c *Client) registerNode() error {
// Update the node status to ready after we register.
c.configLock.Lock()
node.Status = structs.NodeStatusReady
c.config.Node.Status = structs.NodeStatusReady
c.configLock.Unlock()

c.logger.Printf("[INFO] client: node registration complete")
Expand Down
8 changes: 8 additions & 0 deletions nomad/deploymentwatcher/deployment_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,14 @@ func (w *deploymentWatcher) createEvalBatched(forIndex uint64) {
w.outstandingBatch = true

time.AfterFunc(perJobEvalBatchPeriod, func() {
// If the timer has been created and then we shutdown, we need to no-op
// the evaluation creation.
select {
case <-w.ctx.Done():
return
default:
}

// Create the eval
evalCreateIndex, err := w.createEvaluation(w.getEval())
if err != nil {
Expand Down
30 changes: 30 additions & 0 deletions nomad/heartbeat.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nomad

import (
"errors"
"time"

"github.com/armon/go-metrics"
Expand All @@ -9,6 +10,18 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)

const (
// heartbeatNotLeader is the error string returned when the heartbeat request
// couldn't be completed since the server is not the leader.
heartbeatNotLeader = "failed to reset heartbeat since server is not leader"
)

var (
// heartbeatNotLeaderErr is the error returned when the heartbeat request
// couldn't be completed since the server is not the leader.
heartbeatNotLeaderErr = errors.New(heartbeatNotLeader)
)

// initializeHeartbeatTimers is used when a leader is newly elected to create
// a new map to track heartbeat expiration and to reset all the timers from
// the previously known set of timers.
Expand Down Expand Up @@ -50,6 +63,14 @@ func (s *Server) resetHeartbeatTimer(id string) (time.Duration, error) {
s.heartbeatTimersLock.Lock()
defer s.heartbeatTimersLock.Unlock()

// Do not create a timer for the node since we are not the leader. This
// check avoids the race in which leadership is lost but a timer is created
// on this server since it was servicing an RPC during a leadership loss.
if !s.IsLeader() {
s.logger.Printf("[DEBUG] nomad.heartbeat: ignoring resetting node %q TTL since this node is not the leader", id)
return 0, heartbeatNotLeaderErr
}

// Compute the target TTL value
n := len(s.heartbeatTimers)
ttl := lib.RateScaledInterval(s.config.MaxHeartbeatsPerSecond, s.config.MinHeartbeatTTL, n)
Expand Down Expand Up @@ -89,6 +110,15 @@ func (s *Server) invalidateHeartbeat(id string) {
s.heartbeatTimersLock.Lock()
delete(s.heartbeatTimers, id)
s.heartbeatTimersLock.Unlock()

// Do not invalidate the node since we are not the leader. This check avoids
// the race in which leadership is lost but a timer is created on this
// server since it was servicing an RPC during a leadership loss.
if !s.IsLeader() {
s.logger.Printf("[DEBUG] nomad.heartbeat: ignoring node %q TTL since this node is not the leader", id)
return
}

s.logger.Printf("[WARN] nomad.heartbeat: node '%s' TTL expired", id)

// Make a request to update the node status
Expand Down
40 changes: 30 additions & 10 deletions nomad/heartbeat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ import (
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
)

func TestInitializeHeartbeatTimers(t *testing.T) {
func TestHeartbeat_InitializeHeartbeatTimers(t *testing.T) {
t.Parallel()
s1 := TestServer(t, nil)
defer s1.Shutdown()
Expand All @@ -38,7 +39,7 @@ func TestInitializeHeartbeatTimers(t *testing.T) {
}
}

func TestResetHeartbeatTimer(t *testing.T) {
func TestHeartbeat_ResetHeartbeatTimer(t *testing.T) {
t.Parallel()
s1 := TestServer(t, nil)
defer s1.Shutdown()
Expand All @@ -60,7 +61,24 @@ func TestResetHeartbeatTimer(t *testing.T) {
}
}

func TestResetHeartbeatTimerLocked(t *testing.T) {
func TestHeartbeat_ResetHeartbeatTimer_Nonleader(t *testing.T) {
t.Parallel()
require := require.New(t)
s1 := testServer(t, func(c *Config) {
c.BootstrapExpect = 3 // Won't become leader
c.DevDisableBootstrap = true
})
defer s1.Shutdown()

require.False(s1.IsLeader())

// Create a new timer
_, err := s1.resetHeartbeatTimer("test")
require.NotNil(err)
require.EqualError(err, heartbeatNotLeader)
}

func TestHeartbeat_ResetHeartbeatTimerLocked(t *testing.T) {
t.Parallel()
s1 := TestServer(t, nil)
defer s1.Shutdown()
Expand All @@ -81,7 +99,7 @@ func TestResetHeartbeatTimerLocked(t *testing.T) {
}
}

func TestResetHeartbeatTimerLocked_Renew(t *testing.T) {
func TestHeartbeat_ResetHeartbeatTimerLocked_Renew(t *testing.T) {
t.Parallel()
s1 := TestServer(t, nil)
defer s1.Shutdown()
Expand Down Expand Up @@ -120,7 +138,7 @@ func TestResetHeartbeatTimerLocked_Renew(t *testing.T) {
t.Fatalf("should have expired")
}

func TestInvalidateHeartbeat(t *testing.T) {
func TestHeartbeat_InvalidateHeartbeat(t *testing.T) {
t.Parallel()
s1 := TestServer(t, nil)
defer s1.Shutdown()
Expand Down Expand Up @@ -148,7 +166,7 @@ func TestInvalidateHeartbeat(t *testing.T) {
}
}

func TestClearHeartbeatTimer(t *testing.T) {
func TestHeartbeat_ClearHeartbeatTimer(t *testing.T) {
t.Parallel()
s1 := TestServer(t, nil)
defer s1.Shutdown()
Expand All @@ -168,7 +186,7 @@ func TestClearHeartbeatTimer(t *testing.T) {
}
}

func TestClearAllHeartbeatTimers(t *testing.T) {
func TestHeartbeat_ClearAllHeartbeatTimers(t *testing.T) {
t.Parallel()
s1 := TestServer(t, nil)
defer s1.Shutdown()
Expand All @@ -190,7 +208,7 @@ func TestClearAllHeartbeatTimers(t *testing.T) {
}
}

func TestServer_HeartbeatTTL_Failover(t *testing.T) {
func TestHeartbeat_Server_HeartbeatTTL_Failover(t *testing.T) {
t.Parallel()
s1 := TestServer(t, nil)
defer s1.Shutdown()
Expand Down Expand Up @@ -253,9 +271,11 @@ func TestServer_HeartbeatTTL_Failover(t *testing.T) {
leader.Shutdown()

// heartbeatTimers should be cleared on leader shutdown
if len(leader.heartbeatTimers) != 0 {
testutil.WaitForResult(func() (bool, error) {
return len(leader.heartbeatTimers) == 0, nil
}, func(err error) {
t.Fatalf("heartbeat timers should be empty on the shutdown leader")
}
})

// Find the new leader
testutil.WaitForResult(func() (bool, error) {
Expand Down
12 changes: 12 additions & 0 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,16 @@ RECONCILE:
if !establishedLeader {
if err := s.establishLeadership(stopCh); err != nil {
s.logger.Printf("[ERR] nomad: failed to establish leadership: %v", err)

// Immediately revoke leadership since we didn't successfully
// establish leadership.
if err := s.revokeLeadership(); err != nil {
s.logger.Printf("[ERR] nomad: failed to revoke leadership: %v", err)
}

goto WAIT
}

establishedLeader = true
defer func() {
if err := s.revokeLeadership(); err != nil {
Expand Down Expand Up @@ -157,6 +165,8 @@ WAIT:
// previously inflight transactions have been committed and that our
// state is up-to-date.
func (s *Server) establishLeadership(stopCh chan struct{}) error {
defer metrics.MeasureSince([]string{"nomad", "leader", "establish_leadership"}, time.Now())

// Generate a leader ACL token. This will allow the leader to issue work
// that requires a valid ACL token.
s.setLeaderAcl(uuid.Generate())
Expand Down Expand Up @@ -639,6 +649,8 @@ func (s *Server) publishJobSummaryMetrics(stopCh chan struct{}) {
// revokeLeadership is invoked once we step down as leader.
// This is used to cleanup any state that may be specific to a leader.
func (s *Server) revokeLeadership() error {
defer metrics.MeasureSince([]string{"nomad", "leader", "revoke_leadership"}, time.Now())

// Clear the leader token since we are no longer the leader.
s.setLeaderAcl("")

Expand Down
17 changes: 17 additions & 0 deletions nomad/leader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestLeader_LeftServer(t *testing.T) {
Expand Down Expand Up @@ -979,3 +980,19 @@ func TestLeader_RollRaftServer(t *testing.T) {
})
}
}

func TestLeader_RevokeLeadership_MultipleTimes(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)

testutil.WaitForResult(func() (bool, error) {
return s1.evalBroker.Enabled(), nil
}, func(err error) {
t.Fatalf("should have finished establish leader loop")
})

require.Nil(t, s1.revokeLeadership())
require.Nil(t, s1.revokeLeadership())
require.Nil(t, s1.revokeLeadership())
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion vendor/vendor.json
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@
{"path":"github.com/hashicorp/consul-template/template","checksumSHA1":"N9qobVzScLbTEnGE7MgFnnTbGBw=","revision":"26d029ad37335b3827a9fde5569b2c5e10dcac8f","revisionTime":"2017-10-31T14:25:17Z"},
{"path":"github.com/hashicorp/consul-template/version","checksumSHA1":"NB5+D4AuCNV9Bsqh3YFdPi4AJ6U=","revision":"26d029ad37335b3827a9fde5569b2c5e10dcac8f","revisionTime":"2017-10-31T14:25:17Z"},
{"path":"github.com/hashicorp/consul-template/watch","checksumSHA1":"b4+Y+02pY2Y5620F9ALzKg8Zmdw=","revision":"26d029ad37335b3827a9fde5569b2c5e10dcac8f","revisionTime":"2017-10-31T14:25:17Z"},
{"path":"github.com/hashicorp/consul/agent/consul/autopilot","checksumSHA1":"/nyemJLkxBXKqI9xpLFyTyvOaYY=","revision":"bfeb09983befa337a3b2ebbafb7567913773e40b","revisionTime":"2018-01-23T20:52:17Z"},
{"path":"github.com/hashicorp/consul/agent/consul/autopilot","checksumSHA1":"+I7fgoQlrnTUGW5krqNLadWwtjg=","revision":"d1ede2c93dec7b4580e37ef41d24371abab9d9e9","revisionTime":"2018-02-21T18:19:48Z"},
{"path":"github.com/hashicorp/consul/api","checksumSHA1":"XLfcIX2qpRr0o26aFMjCOzvw6jo=","revision":"51ea240df8476e02215d53fbfad5838bf0d44d21","revisionTime":"2017-10-16T16:22:40Z"},
{"path":"github.com/hashicorp/consul/command/flags","checksumSHA1":"XTQIYV+DPUVRKpVp0+y/78bWH3I=","revision":"d08ab9fd199434e5220276356ecf9617cfec1eb2","revisionTime":"2017-12-18T20:26:35Z"},
{"path":"github.com/hashicorp/consul/lib","checksumSHA1":"HGljdtVaqi/e3DgIHymLRLfPYhw=","revision":"bcafded4e60982d0b71e730f0b8564d73cb1d715","revisionTime":"2017-10-31T16:39:15Z"},
Expand Down

0 comments on commit 42e9fe1

Please sign in to comment.