Skip to content

Commit

Permalink
edge trigger node update
Browse files Browse the repository at this point in the history
  • Loading branch information
chelseakomlo committed Feb 14, 2018
1 parent 6820c96 commit 1313d88
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 103 deletions.
159 changes: 89 additions & 70 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
nconfig "github.com/hashicorp/nomad/nomad/structs/config"
vaultapi "github.com/hashicorp/vault/api"
"github.com/mitchellh/hashstructure"
"github.com/shirou/gopsutil/host"
)

Expand Down Expand Up @@ -78,6 +77,11 @@ const (
// allocSyncRetryIntv is the interval on which we retry updating
// the status of the allocation
allocSyncRetryIntv = 5 * time.Second

// nodeUpdateBatchIntv is minimum interval on which we update the client's
// copy of the node. We do this so that every fingerprint update won't
// trigger the node updating.
nodeUpdateBatchIntv = 30 * time.Second
)

// ClientStatsReporter exposes all the APIs related to resource usage of a Nomad
Expand Down Expand Up @@ -121,6 +125,10 @@ type Client struct {
// triggerDiscoveryCh triggers Consul discovery; see triggerDiscovery
triggerDiscoveryCh chan struct{}

// triggerNodeUpdate triggers the client to mark the Node as changed and
// update it.
triggerNodeUpdate chan bool

// discovered will be ticked whenever Consul discovery completes
// successfully
serversDiscoveredCh chan struct{}
Expand Down Expand Up @@ -194,6 +202,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
shutdownCh: make(chan struct{}),
servers: newServerList(),
triggerDiscoveryCh: make(chan struct{}),
triggerNodeUpdate: make(chan bool),
serversDiscoveredCh: make(chan struct{}),
}

Expand Down Expand Up @@ -927,30 +936,72 @@ func (c *Client) reservePorts() {
func (c *Client) updateNodeFromFingerprint(response *cstructs.FingerprintResponse) *structs.Node {
c.configLock.Lock()
defer c.configLock.Unlock()
for name, val := range response.Attributes {
if val == "" {

var nodeHasChanged bool

for name, new_val := range response.Attributes {
old_val := c.config.Node.Attributes[name]
if old_val == new_val {
continue
}

nodeHasChanged = true
if new_val == "" {
delete(c.config.Node.Attributes, name)
} else {
c.config.Node.Attributes[name] = val
c.config.Node.Attributes[name] = new_val
}
}

// update node links and resources from the diff created from
// fingerprinting
for name, val := range response.Links {
if val == "" {
for name, new_val := range response.Links {
old_val := c.config.Node.Links[name]
if old_val == new_val {
continue
}

nodeHasChanged = true
if new_val == "" {
delete(c.config.Node.Links, name)
} else {
c.config.Node.Links[name] = val
c.config.Node.Links[name] = new_val
}
}

if response.Resources != nil {
if response.Resources != nil && resourcesAreEqual(c.config.Node.Resources, response.Resources) {
nodeHasChanged = true
c.config.Node.Resources.Merge(response.Resources)
}

if nodeHasChanged {
c.updateNode()
}
return c.config.Node
}

// resourcesAreEqual is a temporary function to compare whether resources are
// equal. We can use this until we change fingerprinters to set pointers on a
// return type.
func resourcesAreEqual(first, second *structs.Resources) bool {
if first.CPU != second.CPU {
return false
}
if first.MemoryMB != second.MemoryMB {
return false
}
if first.DiskMB != second.DiskMB {
return false
}
if first.IOPS != second.IOPS {
return false
}
if len(first.Networks) == 0 && len(second.Networks) == 0 {
return false
}
return true
}

// retryIntv calculates a retry interval value given the base
func (c *Client) retryIntv(base time.Duration) time.Duration {
if c.config.DevMode {
Expand All @@ -962,21 +1013,11 @@ func (c *Client) retryIntv(base time.Duration) time.Duration {
// registerAndHeartbeat is a long lived goroutine used to register the client
// and then start heartbeatng to the server.
func (c *Client) registerAndHeartbeat() {
// Before registering capture the hashes of the Node's attribute and
// metadata maps. The hashes may be out of date with what registers but this
// is okay since the loop checking for node updates will detect this and
// reregister. This is necessary to avoid races between the periodic
// fingerprinters and the node registering.
attrHash, metaHash, err := nodeMapHashes(c.Node())
if err != nil {
c.logger.Printf("[ERR] client: failed to determine initial node hashes. May result in stale node being registered: %v", err)
}

// Register the node
c.retryRegisterNode()

// Start watching changes for node changes
go c.watchNodeUpdates(attrHash, metaHash)
go c.watchNodeUpdates()

// Setup the heartbeat timer, for the initial registration
// we want to do this quickly. We want to do it extra quickly
Expand Down Expand Up @@ -1057,40 +1098,6 @@ func (c *Client) run() {
}
}

// nodeMapHashes returns the hashes of the passed Node's attribute and metadata
// maps.
func nodeMapHashes(node *structs.Node) (attrHash, metaHash uint64, err error) {
attrHash, err = hashstructure.Hash(node.Attributes, nil)
if err != nil {
return 0, 0, fmt.Errorf("unable to calculate node attributes hash: %v", err)
}
// Calculate node meta map hash
metaHash, err = hashstructure.Hash(node.Meta, nil)
if err != nil {
return 0, 0, fmt.Errorf("unable to calculate node meta hash: %v", err)
}
return attrHash, metaHash, nil
}

// hasNodeChanged calculates a hash for the node attributes- and meta map.
// The new hash values are compared against the old (passed-in) hash values to
// determine if the node properties have changed. It returns the new hash values
// in case they are different from the old hash values.
func (c *Client) hasNodeChanged(oldAttrHash uint64, oldMetaHash uint64) (bool, uint64, uint64) {
c.configLock.RLock()
defer c.configLock.RUnlock()

// Check if the Node that is being updated by fingerprinters has changed.
newAttrHash, newMetaHash, err := nodeMapHashes(c.config.Node)
if err != nil {
c.logger.Printf("[DEBUG] client: unable to calculate node hashes: %v", err)
}
if newAttrHash != oldAttrHash || newMetaHash != oldMetaHash {
return true, newAttrHash, newMetaHash
}
return false, oldAttrHash, oldMetaHash
}

// retryRegisterNode is used to register the node or update the registration and
// retry in case of failure.
func (c *Client) retryRegisterNode() {
Expand Down Expand Up @@ -1484,28 +1491,40 @@ OUTER:
}
}

// watchNodeUpdates periodically checks for changes to the node attributes or
// meta map. The passed hashes are the initial hash values for the attribute and
// metadata of the node respectively.
func (c *Client) watchNodeUpdates(attrHash, metaHash uint64) {
c.logger.Printf("[DEBUG] client: periodically checking for node changes at duration %v", nodeUpdateRetryIntv)
// updateNode triggers a client to update its node copy if it isn't doing
// so already
func (c *Client) updateNode() {
select {
case c.triggerNodeUpdate <- true:
// Node update goroutine was released to execute
default:
// Node update goroutine was already running
}
}

// watchNodeUpdates blocks until it is edge triggered. Once triggered,
// it will update the client node copy and re-register the node.
func (c *Client) watchNodeUpdates() {
c.logger.Printf("[DEBUG] client: starting process to watch for node updates.")

var changed bool
intv := c.retryIntv(nodeUpdateBatchIntv)
for {
select {
case <-time.After(c.retryIntv(nodeUpdateRetryIntv)):
changed, attrHash, metaHash = c.hasNodeChanged(attrHash, metaHash)
if changed {
c.logger.Printf("[DEBUG] client: state changed, updating node.")

// Update the config copy.
c.configLock.Lock()
node := c.config.Node.Copy()
c.configCopy.Node = node
c.configLock.Unlock()
case <-c.triggerNodeUpdate:
c.logger.Printf("[DEBUG] client: state changed, updating node and re-registering.")

// Update the config copy.
c.configLock.Lock()
node := c.config.Node.Copy()
c.configCopy.Node = node
c.configLock.Unlock()

c.retryRegisterNode()

case <-time.After(intv):
continue

c.retryRegisterNode()
}
case <-c.shutdownCh:
return
}
Expand Down
33 changes: 0 additions & 33 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
nconfig "github.com/hashicorp/nomad/nomad/structs/config"
"github.com/hashicorp/nomad/testutil"
"github.com/mitchellh/hashstructure"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -223,38 +222,6 @@ func TestClient_Fingerprint(t *testing.T) {
require.NotEqual("", node.Attributes["driver.mock_driver"])
}

func TestClient_HasNodeChanged(t *testing.T) {
t.Parallel()
c := testClient(t, nil)
defer c.Shutdown()

node := c.config.Node
attrHash, err := hashstructure.Hash(node.Attributes, nil)
if err != nil {
c.logger.Printf("[DEBUG] client: unable to calculate node attributes hash: %v", err)
}
// Calculate node meta map hash
metaHash, err := hashstructure.Hash(node.Meta, nil)
if err != nil {
c.logger.Printf("[DEBUG] client: unable to calculate node meta hash: %v", err)
}
if changed, _, _ := c.hasNodeChanged(attrHash, metaHash); changed {
t.Fatalf("Unexpected hash change.")
}

// Change node attribute
node.Attributes["arch"] = "xyz_86"
if changed, newAttrHash, _ := c.hasNodeChanged(attrHash, metaHash); !changed {
t.Fatalf("Expected hash change in attributes: %d vs %d", attrHash, newAttrHash)
}

// Change node meta map
node.Meta["foo"] = "bar"
if changed, _, newMetaHash := c.hasNodeChanged(attrHash, metaHash); !changed {
t.Fatalf("Expected hash change in meta map: %d vs %d", metaHash, newMetaHash)
}
}

func TestClient_Fingerprint_Periodic(t *testing.T) {
driver.CheckForMockDriver(t)
t.Parallel()
Expand Down

0 comments on commit 1313d88

Please sign in to comment.