-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Edge trigger node updates #3873
Changes from 7 commits
7246c9d
4271123
cd5be3b
44fdb74
2969611
4b6bcf7
d635323
bf72619
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,7 +35,6 @@ import ( | |
"github.com/hashicorp/nomad/nomad/structs" | ||
nconfig "github.com/hashicorp/nomad/nomad/structs/config" | ||
vaultapi "github.com/hashicorp/vault/api" | ||
"github.com/mitchellh/hashstructure" | ||
"github.com/shirou/gopsutil/host" | ||
) | ||
|
||
|
@@ -130,6 +129,10 @@ type Client struct { | |
// triggerDiscoveryCh triggers Consul discovery; see triggerDiscovery | ||
triggerDiscoveryCh chan struct{} | ||
|
||
// triggerNodeUpdate triggers the client to mark the Node as changed and | ||
// update it. | ||
triggerNodeUpdate chan struct{} | ||
|
||
// discovered will be ticked whenever Consul discovery completes | ||
// successfully | ||
serversDiscoveredCh chan struct{} | ||
|
@@ -209,6 +212,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic | |
allocUpdates: make(chan *structs.Allocation, 64), | ||
shutdownCh: make(chan struct{}), | ||
triggerDiscoveryCh: make(chan struct{}), | ||
triggerNodeUpdate: make(chan struct{}, 8), | ||
serversDiscoveredCh: make(chan struct{}), | ||
} | ||
|
||
|
@@ -956,30 +960,81 @@ func (c *Client) reservePorts() { | |
func (c *Client) updateNodeFromFingerprint(response *cstructs.FingerprintResponse) *structs.Node { | ||
c.configLock.Lock() | ||
defer c.configLock.Unlock() | ||
for name, val := range response.Attributes { | ||
if val == "" { | ||
|
||
nodeHasChanged := false | ||
|
||
for name, newVal := range response.Attributes { | ||
oldVal := c.config.Node.Attributes[name] | ||
if oldVal == newVal { | ||
continue | ||
} | ||
|
||
nodeHasChanged = true | ||
if newVal == "" { | ||
delete(c.config.Node.Attributes, name) | ||
} else { | ||
c.config.Node.Attributes[name] = val | ||
c.config.Node.Attributes[name] = newVal | ||
} | ||
} | ||
|
||
// update node links and resources from the diff created from | ||
// fingerprinting | ||
for name, val := range response.Links { | ||
if val == "" { | ||
for name, newVal := range response.Links { | ||
oldVal := c.config.Node.Links[name] | ||
if oldVal == newVal { | ||
continue | ||
} | ||
|
||
nodeHasChanged = true | ||
if newVal == "" { | ||
delete(c.config.Node.Links, name) | ||
} else { | ||
c.config.Node.Links[name] = val | ||
c.config.Node.Links[name] = newVal | ||
} | ||
} | ||
|
||
if response.Resources != nil { | ||
if response.Resources != nil && !resourcesAreEqual(c.config.Node.Resources, response.Resources) { | ||
nodeHasChanged = true | ||
c.config.Node.Resources.Merge(response.Resources) | ||
} | ||
|
||
if nodeHasChanged { | ||
c.updateNode() | ||
} | ||
return c.config.Node | ||
} | ||
|
||
// resourcesAreEqual is a temporary function to compare whether resources are | ||
// equal. We can use this until we change fingerprinters to set pointers on a | ||
// return type. | ||
func resourcesAreEqual(first, second *structs.Resources) bool { | ||
if first.CPU != second.CPU { | ||
return false | ||
} | ||
if first.MemoryMB != second.MemoryMB { | ||
return false | ||
} | ||
if first.DiskMB != second.DiskMB { | ||
return false | ||
} | ||
if first.IOPS != second.IOPS { | ||
return false | ||
} | ||
if len(first.Networks) != len(second.Networks) { | ||
return false | ||
} | ||
for i, e := range first.Networks { | ||
if len(second.Networks) < i { | ||
return false | ||
} | ||
f := second.Networks[i] | ||
if !e.Equals(f) { | ||
return false | ||
} | ||
} | ||
return true | ||
} | ||
|
||
// retryIntv calculates a retry interval value given the base | ||
func (c *Client) retryIntv(base time.Duration) time.Duration { | ||
if c.config.DevMode { | ||
|
@@ -991,21 +1046,11 @@ func (c *Client) retryIntv(base time.Duration) time.Duration { | |
// registerAndHeartbeat is a long lived goroutine used to register the client | ||
// and then start heartbeatng to the server. | ||
func (c *Client) registerAndHeartbeat() { | ||
// Before registering capture the hashes of the Node's attribute and | ||
// metadata maps. The hashes may be out of date with what registers but this | ||
// is okay since the loop checking for node updates will detect this and | ||
// reregister. This is necessary to avoid races between the periodic | ||
// fingerprinters and the node registering. | ||
attrHash, metaHash, err := nodeMapHashes(c.Node()) | ||
if err != nil { | ||
c.logger.Printf("[ERR] client: failed to determine initial node hashes. May result in stale node being registered: %v", err) | ||
} | ||
|
||
// Register the node | ||
c.retryRegisterNode() | ||
|
||
// Start watching changes for node changes | ||
go c.watchNodeUpdates(attrHash, metaHash) | ||
go c.watchNodeUpdates() | ||
|
||
// Setup the heartbeat timer, for the initial registration | ||
// we want to do this quickly. We want to do it extra quickly | ||
|
@@ -1086,40 +1131,6 @@ func (c *Client) run() { | |
} | ||
} | ||
|
||
// nodeMapHashes returns the hashes of the passed Node's attribute and metadata | ||
// maps. | ||
func nodeMapHashes(node *structs.Node) (attrHash, metaHash uint64, err error) { | ||
attrHash, err = hashstructure.Hash(node.Attributes, nil) | ||
if err != nil { | ||
return 0, 0, fmt.Errorf("unable to calculate node attributes hash: %v", err) | ||
} | ||
// Calculate node meta map hash | ||
metaHash, err = hashstructure.Hash(node.Meta, nil) | ||
if err != nil { | ||
return 0, 0, fmt.Errorf("unable to calculate node meta hash: %v", err) | ||
} | ||
return attrHash, metaHash, nil | ||
} | ||
|
||
// hasNodeChanged calculates a hash for the node attributes- and meta map. | ||
// The new hash values are compared against the old (passed-in) hash values to | ||
// determine if the node properties have changed. It returns the new hash values | ||
// in case they are different from the old hash values. | ||
func (c *Client) hasNodeChanged(oldAttrHash uint64, oldMetaHash uint64) (bool, uint64, uint64) { | ||
c.configLock.RLock() | ||
defer c.configLock.RUnlock() | ||
|
||
// Check if the Node that is being updated by fingerprinters has changed. | ||
newAttrHash, newMetaHash, err := nodeMapHashes(c.config.Node) | ||
if err != nil { | ||
c.logger.Printf("[DEBUG] client: unable to calculate node hashes: %v", err) | ||
} | ||
if newAttrHash != oldAttrHash || newMetaHash != oldMetaHash { | ||
return true, newAttrHash, newMetaHash | ||
} | ||
return false, oldAttrHash, oldMetaHash | ||
} | ||
|
||
// retryRegisterNode is used to register the node or update the registration and | ||
// retry in case of failure. | ||
func (c *Client) retryRegisterNode() { | ||
|
@@ -1512,28 +1523,44 @@ OUTER: | |
} | ||
} | ||
|
||
// watchNodeUpdates periodically checks for changes to the node attributes or | ||
// meta map. The passed hashes are the initial hash values for the attribute and | ||
// metadata of the node respectively. | ||
func (c *Client) watchNodeUpdates(attrHash, metaHash uint64) { | ||
c.logger.Printf("[DEBUG] client: periodically checking for node changes at duration %v", nodeUpdateRetryIntv) | ||
// updateNode triggers a client to update its node copy if it isn't doing | ||
// so already | ||
func (c *Client) updateNode() { | ||
select { | ||
case c.triggerNodeUpdate <- struct{}{}: | ||
// Node update goroutine was released to execute | ||
default: | ||
// Node update goroutine was already running | ||
} | ||
} | ||
|
||
// watchNodeUpdates blocks until it is edge triggered. Once triggered, | ||
// it will update the client node copy and re-register the node. | ||
func (c *Client) watchNodeUpdates() { | ||
var hasChanged bool | ||
timer := time.NewTimer(c.retryIntv(nodeUpdateRetryIntv)) | ||
defer timer.Stop() | ||
|
||
var changed bool | ||
for { | ||
select { | ||
case <-time.After(c.retryIntv(nodeUpdateRetryIntv)): | ||
changed, attrHash, metaHash = c.hasNodeChanged(attrHash, metaHash) | ||
if changed { | ||
c.logger.Printf("[DEBUG] client: state changed, updating node.") | ||
case <-timer.C: | ||
if !hasChanged { | ||
continue | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right now this code could never fire even if there is a node update |
||
} | ||
c.logger.Printf("[DEBUG] client: state changed, updating node and re-registering.") | ||
|
||
// Update the config copy. | ||
c.configLock.Lock() | ||
node := c.config.Node.Copy() | ||
c.configCopy.Node = node | ||
c.configLock.Unlock() | ||
// Update the config copy. | ||
c.configLock.Lock() | ||
node := c.config.Node.Copy() | ||
c.configCopy.Node = node | ||
c.configLock.Unlock() | ||
|
||
c.retryRegisterNode() | ||
} | ||
c.retryRegisterNode() | ||
|
||
hasChanged = false | ||
timer.Reset(c.retryIntv(nodeUpdateRetryIntv)) | ||
case <-c.triggerNodeUpdate: | ||
hasChanged = true | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not put all timer logic in this case and then remove the if statement at the top of the timer case and the reset at the bottom:
|
||
case <-c.shutdownCh: | ||
return | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This just checks the length?