Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Edge trigger node updates #3873

Merged
merged 8 commits into from
Mar 1, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 96 additions & 69 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
nconfig "github.com/hashicorp/nomad/nomad/structs/config"
vaultapi "github.com/hashicorp/vault/api"
"github.com/mitchellh/hashstructure"
"github.com/shirou/gopsutil/host"
)

Expand Down Expand Up @@ -130,6 +129,10 @@ type Client struct {
// triggerDiscoveryCh triggers Consul discovery; see triggerDiscovery
triggerDiscoveryCh chan struct{}

// triggerNodeUpdate triggers the client to mark the Node as changed and
// update it.
triggerNodeUpdate chan struct{}

// discovered will be ticked whenever Consul discovery completes
// successfully
serversDiscoveredCh chan struct{}
Expand Down Expand Up @@ -209,6 +212,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
allocUpdates: make(chan *structs.Allocation, 64),
shutdownCh: make(chan struct{}),
triggerDiscoveryCh: make(chan struct{}),
triggerNodeUpdate: make(chan struct{}, 8),
serversDiscoveredCh: make(chan struct{}),
}

Expand Down Expand Up @@ -956,30 +960,81 @@ func (c *Client) reservePorts() {
func (c *Client) updateNodeFromFingerprint(response *cstructs.FingerprintResponse) *structs.Node {
c.configLock.Lock()
defer c.configLock.Unlock()
for name, val := range response.Attributes {
if val == "" {

nodeHasChanged := false

for name, newVal := range response.Attributes {
oldVal := c.config.Node.Attributes[name]
if oldVal == newVal {
continue
}

nodeHasChanged = true
if newVal == "" {
delete(c.config.Node.Attributes, name)
} else {
c.config.Node.Attributes[name] = val
c.config.Node.Attributes[name] = newVal
}
}

// update node links and resources from the diff created from
// fingerprinting
for name, val := range response.Links {
if val == "" {
for name, newVal := range response.Links {
oldVal := c.config.Node.Links[name]
if oldVal == newVal {
continue
}

nodeHasChanged = true
if newVal == "" {
delete(c.config.Node.Links, name)
} else {
c.config.Node.Links[name] = val
c.config.Node.Links[name] = newVal
}
}

if response.Resources != nil {
if response.Resources != nil && !resourcesAreEqual(c.config.Node.Resources, response.Resources) {
nodeHasChanged = true
c.config.Node.Resources.Merge(response.Resources)
}

if nodeHasChanged {
c.updateNode()
}
return c.config.Node
}

// resourcesAreEqual is a temporary function to compare whether resources are
// equal. We can use this until we change fingerprinters to set pointers on a
// return type.
func resourcesAreEqual(first, second *structs.Resources) bool {
if first.CPU != second.CPU {
return false
}
if first.MemoryMB != second.MemoryMB {
return false
}
if first.DiskMB != second.DiskMB {
return false
}
if first.IOPS != second.IOPS {
return false
}
if len(first.Networks) != len(second.Networks) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This just checks the length?

return false
}
for i, e := range first.Networks {
if len(second.Networks) < i {
return false
}
f := second.Networks[i]
if !e.Equals(f) {
return false
}
}
return true
}

// retryIntv calculates a retry interval value given the base
func (c *Client) retryIntv(base time.Duration) time.Duration {
if c.config.DevMode {
Expand All @@ -991,21 +1046,11 @@ func (c *Client) retryIntv(base time.Duration) time.Duration {
// registerAndHeartbeat is a long lived goroutine used to register the client
// and then start heartbeatng to the server.
func (c *Client) registerAndHeartbeat() {
// Before registering capture the hashes of the Node's attribute and
// metadata maps. The hashes may be out of date with what registers but this
// is okay since the loop checking for node updates will detect this and
// reregister. This is necessary to avoid races between the periodic
// fingerprinters and the node registering.
attrHash, metaHash, err := nodeMapHashes(c.Node())
if err != nil {
c.logger.Printf("[ERR] client: failed to determine initial node hashes. May result in stale node being registered: %v", err)
}

// Register the node
c.retryRegisterNode()

// Start watching changes for node changes
go c.watchNodeUpdates(attrHash, metaHash)
go c.watchNodeUpdates()

// Setup the heartbeat timer, for the initial registration
// we want to do this quickly. We want to do it extra quickly
Expand Down Expand Up @@ -1086,40 +1131,6 @@ func (c *Client) run() {
}
}

// nodeMapHashes returns the hashes of the passed Node's attribute and metadata
// maps.
func nodeMapHashes(node *structs.Node) (attrHash, metaHash uint64, err error) {
attrHash, err = hashstructure.Hash(node.Attributes, nil)
if err != nil {
return 0, 0, fmt.Errorf("unable to calculate node attributes hash: %v", err)
}
// Calculate node meta map hash
metaHash, err = hashstructure.Hash(node.Meta, nil)
if err != nil {
return 0, 0, fmt.Errorf("unable to calculate node meta hash: %v", err)
}
return attrHash, metaHash, nil
}

// hasNodeChanged calculates a hash for the node attributes- and meta map.
// The new hash values are compared against the old (passed-in) hash values to
// determine if the node properties have changed. It returns the new hash values
// in case they are different from the old hash values.
func (c *Client) hasNodeChanged(oldAttrHash uint64, oldMetaHash uint64) (bool, uint64, uint64) {
c.configLock.RLock()
defer c.configLock.RUnlock()

// Check if the Node that is being updated by fingerprinters has changed.
newAttrHash, newMetaHash, err := nodeMapHashes(c.config.Node)
if err != nil {
c.logger.Printf("[DEBUG] client: unable to calculate node hashes: %v", err)
}
if newAttrHash != oldAttrHash || newMetaHash != oldMetaHash {
return true, newAttrHash, newMetaHash
}
return false, oldAttrHash, oldMetaHash
}

// retryRegisterNode is used to register the node or update the registration and
// retry in case of failure.
func (c *Client) retryRegisterNode() {
Expand Down Expand Up @@ -1512,28 +1523,44 @@ OUTER:
}
}

// watchNodeUpdates periodically checks for changes to the node attributes or
// meta map. The passed hashes are the initial hash values for the attribute and
// metadata of the node respectively.
func (c *Client) watchNodeUpdates(attrHash, metaHash uint64) {
c.logger.Printf("[DEBUG] client: periodically checking for node changes at duration %v", nodeUpdateRetryIntv)
// updateNode triggers a client to update its node copy if it isn't doing
// so already
func (c *Client) updateNode() {
select {
case c.triggerNodeUpdate <- struct{}{}:
// Node update goroutine was released to execute
default:
// Node update goroutine was already running
}
}

// watchNodeUpdates blocks until it is edge triggered. Once triggered,
// it will update the client node copy and re-register the node.
func (c *Client) watchNodeUpdates() {
var hasChanged bool
timer := time.NewTimer(c.retryIntv(nodeUpdateRetryIntv))
defer timer.Stop()

var changed bool
for {
select {
case <-time.After(c.retryIntv(nodeUpdateRetryIntv)):
changed, attrHash, metaHash = c.hasNodeChanged(attrHash, metaHash)
if changed {
c.logger.Printf("[DEBUG] client: state changed, updating node.")
case <-timer.C:
c.logger.Printf("[DEBUG] client: state changed, updating node and re-registering.")

// Update the config copy.
c.configLock.Lock()
node := c.config.Node.Copy()
c.configCopy.Node = node
c.configLock.Unlock()
// Update the config copy.
c.configLock.Lock()
node := c.config.Node.Copy()
c.configCopy.Node = node
c.configLock.Unlock()

c.retryRegisterNode()
c.retryRegisterNode()

hasChanged = false
case <-c.triggerNodeUpdate:
if hasChanged {
continue
}
hasChanged = true
timer.Reset(c.retryIntv(nodeUpdateRetryIntv))
case <-c.shutdownCh:
return
}
Expand Down
37 changes: 2 additions & 35 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
nconfig "github.com/hashicorp/nomad/nomad/structs/config"
"github.com/hashicorp/nomad/testutil"
"github.com/mitchellh/hashstructure"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -126,38 +125,6 @@ func TestClient_Fingerprint(t *testing.T) {
require.NotEqual("", node.Attributes["driver.mock_driver"])
}

func TestClient_HasNodeChanged(t *testing.T) {
t.Parallel()
c := TestClient(t, nil)
defer c.Shutdown()

node := c.config.Node
attrHash, err := hashstructure.Hash(node.Attributes, nil)
if err != nil {
c.logger.Printf("[DEBUG] client: unable to calculate node attributes hash: %v", err)
}
// Calculate node meta map hash
metaHash, err := hashstructure.Hash(node.Meta, nil)
if err != nil {
c.logger.Printf("[DEBUG] client: unable to calculate node meta hash: %v", err)
}
if changed, _, _ := c.hasNodeChanged(attrHash, metaHash); changed {
t.Fatalf("Unexpected hash change.")
}

// Change node attribute
node.Attributes["arch"] = "xyz_86"
if changed, newAttrHash, _ := c.hasNodeChanged(attrHash, metaHash); !changed {
t.Fatalf("Expected hash change in attributes: %d vs %d", attrHash, newAttrHash)
}

// Change node meta map
node.Meta["foo"] = "bar"
if changed, _, newMetaHash := c.hasNodeChanged(attrHash, metaHash); !changed {
t.Fatalf("Expected hash change in meta map: %d vs %d", metaHash, newMetaHash)
}
}

func TestClient_Fingerprint_Periodic(t *testing.T) {
driver.CheckForMockDriver(t)
t.Parallel()
Expand All @@ -166,8 +133,8 @@ func TestClient_Fingerprint_Periodic(t *testing.T) {
// our linter without explicit disabling.
c1 := TestClient(t, func(c *config.Config) {
c.Options = map[string]string{
driver.ShutdownPeriodicAfter: "true", // nolint: varcheck
driver.ShutdownPeriodicDuration: "3", // nolint: varcheck
driver.ShutdownPeriodicAfter: "true",
driver.ShutdownPeriodicDuration: "3",
}
})
defer c1.Shutdown()
Expand Down
4 changes: 2 additions & 2 deletions client/fingerprint_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,11 @@ func (fm *FingerprintManager) fingerprint(name string, f fingerprint.Fingerprint
return false, err
}

fm.nodeLock.Lock()
if node := fm.updateNode(&response); node != nil {
fm.nodeLock.Lock()
fm.node = node
fm.nodeLock.Unlock()
}
fm.nodeLock.Unlock()

return response.Detected, nil
}
Expand Down
Loading