Skip to content

Commit

Permalink
Merge pull request #5585 from hashicorp/b-drivers-node-registration
Browse files Browse the repository at this point in the history
client: wait for batched driver updates before registering nodes
  • Loading branch information
notnoop authored Apr 19, 2019
2 parents 92a4033 + 8041b0c commit 9050f5f
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 16 deletions.
23 changes: 15 additions & 8 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ const (
allocSyncRetryIntv = 5 * time.Second
)

var (
// grace period to allow for batch fingerprint processing
batchFirstFingerprintsProcessingGrace = batchFirstFingerprintsTimeout + 5*time.Second
)

// ClientStatsReporter exposes all the APIs related to resource usage of a Nomad
// Client
type ClientStatsReporter interface {
Expand Down Expand Up @@ -419,6 +424,13 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
return nil, fmt.Errorf("failed to setup vault client: %v", err)
}

// wait until drivers are healthy before restoring or registering with servers
select {
case <-c.Ready():
case <-time.After(batchFirstFingerprintsProcessingGrace):
logger.Warn("batch fingerprint operation timed out; proceeding to register with fingerprinted plugins so far")
}

// Restore the state
if err := c.restoreState(); err != nil {
logger.Error("failed to restore state", "error", err)
Expand Down Expand Up @@ -1456,13 +1468,7 @@ func (c *Client) watchNodeEvents() {
// batchEvents stores events that have yet to be published
var batchEvents []*structs.NodeEvent

// Create and drain the timer
timer := time.NewTimer(0)
timer.Stop()
select {
case <-timer.C:
default:
}
timer := stoppedTimer()
defer timer.Stop()

for {
Expand Down Expand Up @@ -1918,7 +1924,8 @@ func (c *Client) updateNodeLocked() {
// it will update the client node copy and re-register the node.
func (c *Client) watchNodeUpdates() {
var hasChanged bool
timer := time.NewTimer(c.retryIntv(nodeUpdateRetryIntv))

timer := stoppedTimer()
defer timer.Stop()

for {
Expand Down
2 changes: 1 addition & 1 deletion client/node_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
var (
// batchFirstFingerprintsTimeout is the maximum amount of time to wait for
// initial fingerprinting to complete before sending a batched Node update
batchFirstFingerprintsTimeout = 5 * time.Second
batchFirstFingerprintsTimeout = 50 * time.Second
)

// batchFirstFingerprints waits for the first fingerprint response from all
Expand Down
20 changes: 13 additions & 7 deletions client/pluginmanager/drivermanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,26 +243,32 @@ func (m *manager) waitForFirstFingerprint(ctx context.Context, cancel context.Ca
}

var mu sync.Mutex
var availDrivers []string
driversByStatus := map[drivers.HealthState][]string{}

var wg sync.WaitGroup

recordDriver := func(name string, lastHeath drivers.HealthState) {
mu.Lock()
defer mu.Unlock()

updated := append(driversByStatus[lastHeath], name)
driversByStatus[lastHeath] = updated
}

// loop through instances and wait for each to finish initial fingerprint
m.instancesMu.RLock()
for n, i := range m.instances {
wg.Add(1)
go func(name string, instance *instanceManager) {
defer wg.Done()
instance.WaitForFirstFingerprint(ctx)
if instance.getLastHealth() != drivers.HealthStateUndetected {
mu.Lock()
availDrivers = append(availDrivers, name)
mu.Unlock()
}
recordDriver(name, instance.getLastHealth())
}(n, i)
}
m.instancesMu.RUnlock()
wg.Wait()
m.logger.Debug("detected drivers", "drivers", availDrivers)

m.logger.Debug("detected drivers", "drivers", driversByStatus)
}

func (m *manager) loadReattachConfigs() error {
Expand Down
11 changes: 11 additions & 0 deletions client/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package client
import (
"fmt"
"math/rand"
"time"

"github.com/hashicorp/nomad/nomad/structs"
)
Expand Down Expand Up @@ -63,3 +64,13 @@ func shuffleStrings(list []string) {
list[i], list[j] = list[j], list[i]
}
}

// stoppedTimer returns a timer that's stopped and wouldn't fire until
// it's reset
func stoppedTimer() *time.Timer {
timer := time.NewTimer(0)
if !timer.Stop() {
<-timer.C
}
return timer
}

0 comments on commit 9050f5f

Please sign in to comment.