Skip to content

Commit

Permalink
Merge pull request #3873 from hashicorp/r-edge-trigger-node-watcher
Browse files Browse the repository at this point in the history
Edge trigger node updates
  • Loading branch information
chelseakomlo authored Mar 1, 2018
2 parents 2627721 + bf72619 commit a17690c
Show file tree
Hide file tree
Showing 6 changed files with 396 additions and 182 deletions.
165 changes: 96 additions & 69 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
nconfig "github.com/hashicorp/nomad/nomad/structs/config"
vaultapi "github.com/hashicorp/vault/api"
"github.com/mitchellh/hashstructure"
"github.com/shirou/gopsutil/host"
)

Expand Down Expand Up @@ -130,6 +129,10 @@ type Client struct {
// triggerDiscoveryCh triggers Consul discovery; see triggerDiscovery
triggerDiscoveryCh chan struct{}

// triggerNodeUpdate triggers the client to mark the Node as changed and
// update it.
triggerNodeUpdate chan struct{}

// discovered will be ticked whenever Consul discovery completes
// successfully
serversDiscoveredCh chan struct{}
Expand Down Expand Up @@ -209,6 +212,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
allocUpdates: make(chan *structs.Allocation, 64),
shutdownCh: make(chan struct{}),
triggerDiscoveryCh: make(chan struct{}),
triggerNodeUpdate: make(chan struct{}, 8),
serversDiscoveredCh: make(chan struct{}),
}

Expand Down Expand Up @@ -956,30 +960,81 @@ func (c *Client) reservePorts() {
func (c *Client) updateNodeFromFingerprint(response *cstructs.FingerprintResponse) *structs.Node {
c.configLock.Lock()
defer c.configLock.Unlock()
for name, val := range response.Attributes {
if val == "" {

nodeHasChanged := false

for name, newVal := range response.Attributes {
oldVal := c.config.Node.Attributes[name]
if oldVal == newVal {
continue
}

nodeHasChanged = true
if newVal == "" {
delete(c.config.Node.Attributes, name)
} else {
c.config.Node.Attributes[name] = val
c.config.Node.Attributes[name] = newVal
}
}

// update node links and resources from the diff created from
// fingerprinting
for name, val := range response.Links {
if val == "" {
for name, newVal := range response.Links {
oldVal := c.config.Node.Links[name]
if oldVal == newVal {
continue
}

nodeHasChanged = true
if newVal == "" {
delete(c.config.Node.Links, name)
} else {
c.config.Node.Links[name] = val
c.config.Node.Links[name] = newVal
}
}

if response.Resources != nil {
if response.Resources != nil && !resourcesAreEqual(c.config.Node.Resources, response.Resources) {
nodeHasChanged = true
c.config.Node.Resources.Merge(response.Resources)
}

if nodeHasChanged {
c.updateNode()
}
return c.config.Node
}

// resourcesAreEqual is a temporary function to compare whether resources are
// equal. We can use this until we change fingerprinters to set pointers on a
// return type.
func resourcesAreEqual(first, second *structs.Resources) bool {
if first.CPU != second.CPU {
return false
}
if first.MemoryMB != second.MemoryMB {
return false
}
if first.DiskMB != second.DiskMB {
return false
}
if first.IOPS != second.IOPS {
return false
}
if len(first.Networks) != len(second.Networks) {
return false
}
for i, e := range first.Networks {
if len(second.Networks) < i {
return false
}
f := second.Networks[i]
if !e.Equals(f) {
return false
}
}
return true
}

// retryIntv calculates a retry interval value given the base
func (c *Client) retryIntv(base time.Duration) time.Duration {
if c.config.DevMode {
Expand All @@ -991,21 +1046,11 @@ func (c *Client) retryIntv(base time.Duration) time.Duration {
// registerAndHeartbeat is a long lived goroutine used to register the client
// and then start heartbeatng to the server.
func (c *Client) registerAndHeartbeat() {
// Before registering capture the hashes of the Node's attribute and
// metadata maps. The hashes may be out of date with what registers but this
// is okay since the loop checking for node updates will detect this and
// reregister. This is necessary to avoid races between the periodic
// fingerprinters and the node registering.
attrHash, metaHash, err := nodeMapHashes(c.Node())
if err != nil {
c.logger.Printf("[ERR] client: failed to determine initial node hashes. May result in stale node being registered: %v", err)
}

// Register the node
c.retryRegisterNode()

// Start watching changes for node changes
go c.watchNodeUpdates(attrHash, metaHash)
go c.watchNodeUpdates()

// Setup the heartbeat timer, for the initial registration
// we want to do this quickly. We want to do it extra quickly
Expand Down Expand Up @@ -1086,40 +1131,6 @@ func (c *Client) run() {
}
}

// nodeMapHashes returns the hashes of the passed Node's attribute and metadata
// maps.
func nodeMapHashes(node *structs.Node) (attrHash, metaHash uint64, err error) {
attrHash, err = hashstructure.Hash(node.Attributes, nil)
if err != nil {
return 0, 0, fmt.Errorf("unable to calculate node attributes hash: %v", err)
}
// Calculate node meta map hash
metaHash, err = hashstructure.Hash(node.Meta, nil)
if err != nil {
return 0, 0, fmt.Errorf("unable to calculate node meta hash: %v", err)
}
return attrHash, metaHash, nil
}

// hasNodeChanged calculates a hash for the node attributes- and meta map.
// The new hash values are compared against the old (passed-in) hash values to
// determine if the node properties have changed. It returns the new hash values
// in case they are different from the old hash values.
func (c *Client) hasNodeChanged(oldAttrHash uint64, oldMetaHash uint64) (bool, uint64, uint64) {
c.configLock.RLock()
defer c.configLock.RUnlock()

// Check if the Node that is being updated by fingerprinters has changed.
newAttrHash, newMetaHash, err := nodeMapHashes(c.config.Node)
if err != nil {
c.logger.Printf("[DEBUG] client: unable to calculate node hashes: %v", err)
}
if newAttrHash != oldAttrHash || newMetaHash != oldMetaHash {
return true, newAttrHash, newMetaHash
}
return false, oldAttrHash, oldMetaHash
}

// retryRegisterNode is used to register the node or update the registration and
// retry in case of failure.
func (c *Client) retryRegisterNode() {
Expand Down Expand Up @@ -1512,28 +1523,44 @@ OUTER:
}
}

// watchNodeUpdates periodically checks for changes to the node attributes or
// meta map. The passed hashes are the initial hash values for the attribute and
// metadata of the node respectively.
func (c *Client) watchNodeUpdates(attrHash, metaHash uint64) {
c.logger.Printf("[DEBUG] client: periodically checking for node changes at duration %v", nodeUpdateRetryIntv)
// updateNode triggers a client to update its node copy if it isn't doing
// so already
func (c *Client) updateNode() {
select {
case c.triggerNodeUpdate <- struct{}{}:
// Node update goroutine was released to execute
default:
// Node update goroutine was already running
}
}

// watchNodeUpdates blocks until it is edge triggered. Once triggered,
// it will update the client node copy and re-register the node.
func (c *Client) watchNodeUpdates() {
var hasChanged bool
timer := time.NewTimer(c.retryIntv(nodeUpdateRetryIntv))
defer timer.Stop()

var changed bool
for {
select {
case <-time.After(c.retryIntv(nodeUpdateRetryIntv)):
changed, attrHash, metaHash = c.hasNodeChanged(attrHash, metaHash)
if changed {
c.logger.Printf("[DEBUG] client: state changed, updating node.")
case <-timer.C:
c.logger.Printf("[DEBUG] client: state changed, updating node and re-registering.")

// Update the config copy.
c.configLock.Lock()
node := c.config.Node.Copy()
c.configCopy.Node = node
c.configLock.Unlock()
// Update the config copy.
c.configLock.Lock()
node := c.config.Node.Copy()
c.configCopy.Node = node
c.configLock.Unlock()

c.retryRegisterNode()
c.retryRegisterNode()

hasChanged = false
case <-c.triggerNodeUpdate:
if hasChanged {
continue
}
hasChanged = true
timer.Reset(c.retryIntv(nodeUpdateRetryIntv))
case <-c.shutdownCh:
return
}
Expand Down
37 changes: 2 additions & 35 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
nconfig "github.com/hashicorp/nomad/nomad/structs/config"
"github.com/hashicorp/nomad/testutil"
"github.com/mitchellh/hashstructure"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -126,38 +125,6 @@ func TestClient_Fingerprint(t *testing.T) {
require.NotEqual("", node.Attributes["driver.mock_driver"])
}

func TestClient_HasNodeChanged(t *testing.T) {
t.Parallel()
c := TestClient(t, nil)
defer c.Shutdown()

node := c.config.Node
attrHash, err := hashstructure.Hash(node.Attributes, nil)
if err != nil {
c.logger.Printf("[DEBUG] client: unable to calculate node attributes hash: %v", err)
}
// Calculate node meta map hash
metaHash, err := hashstructure.Hash(node.Meta, nil)
if err != nil {
c.logger.Printf("[DEBUG] client: unable to calculate node meta hash: %v", err)
}
if changed, _, _ := c.hasNodeChanged(attrHash, metaHash); changed {
t.Fatalf("Unexpected hash change.")
}

// Change node attribute
node.Attributes["arch"] = "xyz_86"
if changed, newAttrHash, _ := c.hasNodeChanged(attrHash, metaHash); !changed {
t.Fatalf("Expected hash change in attributes: %d vs %d", attrHash, newAttrHash)
}

// Change node meta map
node.Meta["foo"] = "bar"
if changed, _, newMetaHash := c.hasNodeChanged(attrHash, metaHash); !changed {
t.Fatalf("Expected hash change in meta map: %d vs %d", metaHash, newMetaHash)
}
}

func TestClient_Fingerprint_Periodic(t *testing.T) {
driver.CheckForMockDriver(t)
t.Parallel()
Expand All @@ -166,8 +133,8 @@ func TestClient_Fingerprint_Periodic(t *testing.T) {
// our linter without explicit disabling.
c1 := TestClient(t, func(c *config.Config) {
c.Options = map[string]string{
driver.ShutdownPeriodicAfter: "true", // nolint: varcheck
driver.ShutdownPeriodicDuration: "3", // nolint: varcheck
driver.ShutdownPeriodicAfter: "true",
driver.ShutdownPeriodicDuration: "3",
}
})
defer c1.Shutdown()
Expand Down
4 changes: 2 additions & 2 deletions client/fingerprint_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,11 @@ func (fm *FingerprintManager) fingerprint(name string, f fingerprint.Fingerprint
return false, err
}

fm.nodeLock.Lock()
if node := fm.updateNode(&response); node != nil {
fm.nodeLock.Lock()
fm.node = node
fm.nodeLock.Unlock()
}
fm.nodeLock.Unlock()

return response.Detected, nil
}
Expand Down
Loading

0 comments on commit a17690c

Please sign in to comment.