Skip to content

Commit

Permalink
client/heartbeatstop: don't store client state, use timeout
Browse files Browse the repository at this point in the history
In order to minimize this change while keeping a simple version of the
behavior, we set `lastOk` to the current time less the intial server
connection timeout. If the client starts and never contacts the
server, it will stop all configured tasks after the initial server
connection grace period, on the assumption that we've been out of
touch longer than any configured `stop_after_client_disconnect`.

The more complex state behavior might be justified later, but we
should learn about failure modes first.
  • Loading branch information
langmartin committed Apr 28, 2020
1 parent 198290e commit 1da3031
Show file tree
Hide file tree
Showing 7 changed files with 21 additions and 122 deletions.
13 changes: 7 additions & 6 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,9 +371,6 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
}, // TODO(tgross): refactor these dispenser constructors into csimanager to tidy it up
})

// create heartbeatStop, and restore its previous state from the state store. Post init for the stateDB
c.heartbeatStop = newHeartbeatStop(c.stateDB, c.getAllocRunner, logger, c.shutdownCh)

// Setup the clients RPC server
c.setupClientRpc()

Expand Down Expand Up @@ -448,9 +445,13 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
c.pluginManagers.RegisterAndRun(devManager)

// Batching of initial fingerprints is done to reduce the number of node
// updates sent to the server on startup.
// updates sent to the server on startup. This is the first RPC to the servers
go c.batchFirstFingerprints()

// create heartbeatStop. We go after the first attempt to connect to the server, so
// that our grace period for connection goes for the full time
c.heartbeatStop = newHeartbeatStop(c.getAllocRunner, batchFirstFingerprintsTimeout, logger, c.shutdownCh)

// Watch for disconnection, and heartbeatStopAllocs configured to have a maximum
// lifetime when out of touch with the server
go c.heartbeatStop.watch()
Expand Down Expand Up @@ -1764,7 +1765,7 @@ func (c *Client) registerNode() error {

c.heartbeatLock.Lock()
defer c.heartbeatLock.Unlock()
c.heartbeatStop.setLastOk()
c.heartbeatStop.setLastOk(time.Now())
c.heartbeatTTL = resp.HeartbeatTTL
return nil
}
Expand Down Expand Up @@ -1793,7 +1794,7 @@ func (c *Client) updateNodeStatus() error {
last := c.lastHeartbeat()
oldTTL := c.heartbeatTTL
haveHeartbeated := c.haveHeartbeated
c.heartbeatStop.setLastOk()
c.heartbeatStop.setLastOk(time.Now())
c.heartbeatTTL = resp.HeartbeatTTL
c.haveHeartbeated = true
c.heartbeatLock.Unlock()
Expand Down
34 changes: 13 additions & 21 deletions client/heartbeatstop.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,44 +5,36 @@ import (
"time"

hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/state"
"github.com/hashicorp/nomad/nomad/structs"
)

type heartbeatStop struct {
lastOk time.Time
startupGrace time.Time
allocInterval map[string]time.Duration
allocHookCh chan *structs.Allocation
getRunner func(string) (AllocRunner, error)
logger hclog.InterceptLogger
state state.StateDB
shutdownCh chan struct{}
lock *sync.RWMutex
}

func newHeartbeatStop(
state state.StateDB,
getRunner func(string) (AllocRunner, error),
timeout time.Duration,
logger hclog.InterceptLogger,
shutdownCh chan struct{}) *heartbeatStop {

h := &heartbeatStop{
startupGrace: time.Now().Add(timeout),
allocInterval: make(map[string]time.Duration),
allocHookCh: make(chan *structs.Allocation),
getRunner: getRunner,
logger: logger,
state: state,
shutdownCh: shutdownCh,
lock: &sync.RWMutex{},
}

if state != nil {
lastOk, err := state.GetLastHeartbeatOk()
if err == nil && !lastOk.IsZero() {
h.lastOk = lastOk
}
}

return h
}

Expand All @@ -60,12 +52,18 @@ func (h *heartbeatStop) allocHook(alloc *structs.Allocation) {
func (h *heartbeatStop) shouldStop(alloc *structs.Allocation) bool {
tg := allocTaskGroup(alloc)
if tg.StopAfterClientDisconnect != nil {
now := time.Now()
return now.After(h.lastOk.Add(*tg.StopAfterClientDisconnect))
return h.shouldStopAfter(time.Now(), *tg.StopAfterClientDisconnect)
}
return false
}

func (h *heartbeatStop) shouldStopAfter(now time.Time, interval time.Duration) bool {
if h.lastOk.IsZero() {
return h.startupGrace.After(now)
}
return now.After(h.lastOk.Add(interval))
}

// watch is a loop that checks for allocations that should be stopped. It also manages the
// registration of allocs to be stopped in a single thread.
func (h *heartbeatStop) watch() {
Expand Down Expand Up @@ -116,24 +114,18 @@ func (h *heartbeatStop) watch() {

now = time.Now()
for allocID, d := range h.allocInterval {
if now.After(h.lastOk.Add(d)) {
if h.shouldStopAfter(now, d) {
stop <- allocID
}
}
}
}

// setLastOk sets the last known good heartbeat time to the current time, and persists that time to disk
func (h *heartbeatStop) setLastOk() error {
func (h *heartbeatStop) setLastOk(t time.Time) {
h.lock.Lock()
defer h.lock.Unlock()
t := time.Now()
h.lastOk = t
// We may encounter an error here, but want to update the running time
// unconditionally, since we'll actively terminate stateful tasks if it ages too
// much. We only use the state value when restarting the client itself after a
// crash, so it's better to update the runtime value and have the stored value stale
return h.state.PutLastHeartbeatOk(t)
}

func (h *heartbeatStop) getLastOk() time.Time {
Expand Down
9 changes: 1 addition & 8 deletions client/heartbeatstop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,6 @@ func TestHearbeatStop_allocHook(t *testing.T) {
})
defer cleanupC1()

// last heartbeat is persisted in the state db
err := client.registerNode()
require.NoError(t, err)
last, err := client.stateDB.GetLastHeartbeatOk()
require.NoError(t, err)
require.Empty(t, last)

// an allocation, with a tiny lease
d := 1 * time.Microsecond
alloc := &structs.Allocation{
Expand All @@ -51,7 +44,7 @@ func TestHearbeatStop_allocHook(t *testing.T) {
}

// alloc added to heartbeatStop.allocs
err = client.addAlloc(alloc, "")
err := client.addAlloc(alloc, "")
require.NoError(t, err)
testutil.WaitForResult(func() (bool, error) {
_, ok := client.heartbeatStop.allocInterval[alloc.ID]
Expand Down
9 changes: 0 additions & 9 deletions client/state/interface.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package state

import (
"time"

"github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
dmstate "github.com/hashicorp/nomad/client/devicemanager/state"
"github.com/hashicorp/nomad/client/dynamicplugins"
Expand Down Expand Up @@ -78,13 +76,6 @@ type StateDB interface {
// PutDynamicPluginRegistryState is used to store the dynamic plugin managers's state.
PutDynamicPluginRegistryState(state *dynamicplugins.RegistryState) error

// GetLastHeartbeatOk retrieves the stored last known good heartbeat
GetLastHeartbeatOk() (time.Time, error)

// PutLastHeartbeatOk stores the last heartbeat known to have made the round trip to
// the server
PutLastHeartbeatOk(time.Time) error

// Close the database. Unsafe for further use after calling regardless
// of return value.
Close() error
Expand Down
17 changes: 0 additions & 17 deletions client/state/memdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package state

import (
"sync"
"time"

hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
Expand Down Expand Up @@ -34,9 +33,6 @@ type MemDB struct {
// dynamicmanager -> registry-state
dynamicManagerPs *dynamicplugins.RegistryState

// lastHeartbeatOk -> last_heartbeat_ok
lastHeartbeatOk time.Time

logger hclog.Logger

mu sync.RWMutex
Expand Down Expand Up @@ -93,19 +89,6 @@ func (m *MemDB) PutDeploymentStatus(allocID string, ds *structs.AllocDeploymentS
return nil
}

func (m *MemDB) GetLastHeartbeatOk() (time.Time, error) {
m.mu.Lock()
defer m.mu.Unlock()
return m.lastHeartbeatOk, nil
}

func (m *MemDB) PutLastHeartbeatOk(t time.Time) error {
m.mu.Lock()
defer m.mu.Unlock()
m.lastHeartbeatOk = t
return nil
}

func (m *MemDB) GetTaskRunnerState(allocID string, taskName string) (*state.LocalState, *structs.TaskState, error) {
m.mu.RLock()
defer m.mu.RUnlock()
Expand Down
10 changes: 0 additions & 10 deletions client/state/noopdb.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package state

import (
"time"

"github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
dmstate "github.com/hashicorp/nomad/client/devicemanager/state"
"github.com/hashicorp/nomad/client/dynamicplugins"
Expand Down Expand Up @@ -81,14 +79,6 @@ func (n NoopDB) GetDynamicPluginRegistryState() (*dynamicplugins.RegistryState,
return nil, nil
}

func (n NoopDB) PutLastHeartbeatOk(t time.Time) error {
return nil
}

func (n NoopDB) GetLastHeartbeatOk() (time.Time, error) {
return time.Time{}, nil
}

func (n NoopDB) Close() error {
return nil
}
51 changes: 0 additions & 51 deletions client/state/state_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,6 @@ var (

// registryStateKey is the key at which dynamic plugin registry state is stored
registryStateKey = []byte("registry_state")

// lastHeartbeatOkBucket is the bucket for storing the last known good heartbeat time
lastHeartbeatOkBucket = []byte("last_heartbeat_ok")

// lastHeartbeatOkKey is the key for the last known good heartbeat time
lastHeartbeatOkKey = []byte("last_heartbeat")
)

// taskBucketName returns the bucket name for the given task name.
Expand Down Expand Up @@ -661,51 +655,6 @@ func (s *BoltStateDB) GetDynamicPluginRegistryState() (*dynamicplugins.RegistryS
return ps, nil
}

// PutLastHeartbeatOk stores the dynamic plugin registry's
// state or returns an error.
func (s *BoltStateDB) PutLastHeartbeatOk(t time.Time) error {
return s.db.Update(func(tx *boltdd.Tx) error {
// Retrieve the root dynamic plugin manager bucket
dynamicBkt, err := tx.CreateBucketIfNotExists(lastHeartbeatOkBucket)
if err != nil {
return err
}

return dynamicBkt.Put(lastHeartbeatOkKey, t.Unix())
})
}

// GetLastHeartbeatOk stores the dynamic plugin registry's
// registry state or returns an error.
func (s *BoltStateDB) GetLastHeartbeatOk() (time.Time, error) {
var unix int64
err := s.db.View(func(tx *boltdd.Tx) error {
dynamicBkt := tx.Bucket(lastHeartbeatOkBucket)
if dynamicBkt == nil {
// No state, return
return nil
}

// Restore Plugin State if it exists
if err := dynamicBkt.Get(lastHeartbeatOkKey, unix); err != nil {
if !boltdd.IsErrNotFound(err) {
return fmt.Errorf("failed to read last heartbeat state: %v", err)
}

// Key not found, reset output to nil
unix = 0
}

return nil
})

if err != nil {
return time.Time{}, err
}

return time.Unix(unix, 0), nil
}

// init initializes metadata entries in a newly created state database.
func (s *BoltStateDB) init() error {
return s.db.Update(func(tx *boltdd.Tx) error {
Expand Down

0 comments on commit 1da3031

Please sign in to comment.