From 7b0f18b4bcb4ccc9f3180e040dfc6f1baaf1428d Mon Sep 17 00:00:00 2001 From: Chelsea Holland Komlo Date: Wed, 14 Feb 2018 14:35:15 -0500 Subject: [PATCH] edge trigger node update test update config copy trigger --- client/client.go | 138 ++++++++++++++++------------- client/client_test.go | 66 ++++++++------ client/fingerprint_manager_test.go | 33 +++++++ 3 files changed, 148 insertions(+), 89 deletions(-) diff --git a/client/client.go b/client/client.go index e27d2d62caf..c9bc7dfe651 100644 --- a/client/client.go +++ b/client/client.go @@ -31,7 +31,6 @@ import ( "github.com/hashicorp/nomad/nomad/structs" nconfig "github.com/hashicorp/nomad/nomad/structs/config" vaultapi "github.com/hashicorp/vault/api" - "github.com/mitchellh/hashstructure" "github.com/shirou/gopsutil/host" ) @@ -121,6 +120,10 @@ type Client struct { // triggerDiscoveryCh triggers Consul discovery; see triggerDiscovery triggerDiscoveryCh chan struct{} + // triggerNodeUpdate triggers the client to mark the Node as changed and + // update it. + triggerNodeUpdate chan struct{} + // discovered will be ticked whenever Consul discovery completes // successfully serversDiscoveredCh chan struct{} @@ -194,6 +197,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic shutdownCh: make(chan struct{}), servers: newServerList(), triggerDiscoveryCh: make(chan struct{}), + triggerNodeUpdate: make(chan struct{}), serversDiscoveredCh: make(chan struct{}), } @@ -927,30 +931,72 @@ func (c *Client) reservePorts() { func (c *Client) updateNodeFromFingerprint(response *cstructs.FingerprintResponse) *structs.Node { c.configLock.Lock() defer c.configLock.Unlock() - for name, val := range response.Attributes { - if val == "" { + + var nodeHasChanged bool + + for name, new_val := range response.Attributes { + old_val := c.config.Node.Attributes[name] + if old_val == new_val { + continue + } + + nodeHasChanged = true + if new_val == "" { delete(c.config.Node.Attributes, name) } else { - c.config.Node.Attributes[name] = val + c.config.Node.Attributes[name] = new_val } } // update node links and resources from the diff created from // fingerprinting - for name, val := range response.Links { - if val == "" { + for name, new_val := range response.Links { + old_val := c.config.Node.Links[name] + if old_val == new_val { + continue + } + + nodeHasChanged = true + if new_val == "" { delete(c.config.Node.Links, name) } else { - c.config.Node.Links[name] = val + c.config.Node.Links[name] = new_val } } - if response.Resources != nil { + if response.Resources != nil && !resourcesAreEqual(c.config.Node.Resources, response.Resources) { + nodeHasChanged = true c.config.Node.Resources.Merge(response.Resources) } + + if nodeHasChanged { + c.updateNode() + } return c.config.Node } +// resourcesAreEqual is a temporary function to compare whether resources are +// equal. We can use this until we change fingerprinters to set pointers on a +// return type. +func resourcesAreEqual(first, second *structs.Resources) bool { + if first.CPU != second.CPU { + return false + } + if first.MemoryMB != second.MemoryMB { + return false + } + if first.DiskMB != second.DiskMB { + return false + } + if first.IOPS != second.IOPS { + return false + } + if len(first.Networks) != len(second.Networks) { + return false + } + return true +} + // retryIntv calculates a retry interval value given the base func (c *Client) retryIntv(base time.Duration) time.Duration { if c.config.DevMode { @@ -962,21 +1008,11 @@ func (c *Client) retryIntv(base time.Duration) time.Duration { // registerAndHeartbeat is a long lived goroutine used to register the client // and then start heartbeatng to the server. func (c *Client) registerAndHeartbeat() { - // Before registering capture the hashes of the Node's attribute and - // metadata maps. The hashes may be out of date with what registers but this - // is okay since the loop checking for node updates will detect this and - // reregister. This is necessary to avoid races between the periodic - // fingerprinters and the node registering. - attrHash, metaHash, err := nodeMapHashes(c.Node()) - if err != nil { - c.logger.Printf("[ERR] client: failed to determine initial node hashes. May result in stale node being registered: %v", err) - } - // Register the node c.retryRegisterNode() // Start watching changes for node changes - go c.watchNodeUpdates(attrHash, metaHash) + go c.watchNodeUpdates() // Setup the heartbeat timer, for the initial registration // we want to do this quickly. We want to do it extra quickly @@ -1057,40 +1093,6 @@ func (c *Client) run() { } } -// nodeMapHashes returns the hashes of the passed Node's attribute and metadata -// maps. -func nodeMapHashes(node *structs.Node) (attrHash, metaHash uint64, err error) { - attrHash, err = hashstructure.Hash(node.Attributes, nil) - if err != nil { - return 0, 0, fmt.Errorf("unable to calculate node attributes hash: %v", err) - } - // Calculate node meta map hash - metaHash, err = hashstructure.Hash(node.Meta, nil) - if err != nil { - return 0, 0, fmt.Errorf("unable to calculate node meta hash: %v", err) - } - return attrHash, metaHash, nil -} - -// hasNodeChanged calculates a hash for the node attributes- and meta map. -// The new hash values are compared against the old (passed-in) hash values to -// determine if the node properties have changed. It returns the new hash values -// in case they are different from the old hash values. -func (c *Client) hasNodeChanged(oldAttrHash uint64, oldMetaHash uint64) (bool, uint64, uint64) { - c.configLock.RLock() - defer c.configLock.RUnlock() - - // Check if the Node that is being updated by fingerprinters has changed. - newAttrHash, newMetaHash, err := nodeMapHashes(c.config.Node) - if err != nil { - c.logger.Printf("[DEBUG] client: unable to calculate node hashes: %v", err) - } - if newAttrHash != oldAttrHash || newMetaHash != oldMetaHash { - return true, newAttrHash, newMetaHash - } - return false, oldAttrHash, oldMetaHash -} - // retryRegisterNode is used to register the node or update the registration and // retry in case of failure. func (c *Client) retryRegisterNode() { @@ -1484,19 +1486,28 @@ OUTER: } } -// watchNodeUpdates periodically checks for changes to the node attributes or -// meta map. The passed hashes are the initial hash values for the attribute and -// metadata of the node respectively. -func (c *Client) watchNodeUpdates(attrHash, metaHash uint64) { - c.logger.Printf("[DEBUG] client: periodically checking for node changes at duration %v", nodeUpdateRetryIntv) +// updateNode triggers a client to update its node copy if it isn't doing +// so already +func (c *Client) updateNode() { + select { + case c.triggerNodeUpdate <- struct{}{}: + // Node update goroutine was released to execute + default: + // Node update goroutine was already running + } +} + +// watchNodeUpdates blocks until it is edge triggered. Once triggered, +// it will update the client node copy and re-register the node. +func (c *Client) watchNodeUpdates() { + c.logger.Printf("[DEBUG] client: starting process to watch for node updates.") - var changed bool + var hasChanged bool for { select { case <-time.After(c.retryIntv(nodeUpdateRetryIntv)): - changed, attrHash, metaHash = c.hasNodeChanged(attrHash, metaHash) - if changed { - c.logger.Printf("[DEBUG] client: state changed, updating node.") + if hasChanged { + c.logger.Printf("[DEBUG] client: state changed, updating node and re-registering.") // Update the config copy. c.configLock.Lock() @@ -1506,6 +1517,9 @@ func (c *Client) watchNodeUpdates(attrHash, metaHash uint64) { c.retryRegisterNode() } + hasChanged = false + case <-c.triggerNodeUpdate: + hasChanged = true case <-c.shutdownCh: return } diff --git a/client/client_test.go b/client/client_test.go index da6a14fb7b4..b83e203cb53 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -24,7 +24,6 @@ import ( "github.com/hashicorp/nomad/nomad/structs" nconfig "github.com/hashicorp/nomad/nomad/structs/config" "github.com/hashicorp/nomad/testutil" - "github.com/mitchellh/hashstructure" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -223,36 +222,49 @@ func TestClient_Fingerprint(t *testing.T) { require.NotEqual("", node.Attributes["driver.mock_driver"]) } -func TestClient_HasNodeChanged(t *testing.T) { +func TestClient_TriggerNodeUpdate(t *testing.T) { + driver.CheckForMockDriver(t) t.Parallel() - c := testClient(t, nil) - defer c.Shutdown() - node := c.config.Node - attrHash, err := hashstructure.Hash(node.Attributes, nil) - if err != nil { - c.logger.Printf("[DEBUG] client: unable to calculate node attributes hash: %v", err) - } - // Calculate node meta map hash - metaHash, err := hashstructure.Hash(node.Meta, nil) - if err != nil { - c.logger.Printf("[DEBUG] client: unable to calculate node meta hash: %v", err) - } - if changed, _, _ := c.hasNodeChanged(attrHash, metaHash); changed { - t.Fatalf("Unexpected hash change.") - } + // these constants are only defined when nomad_test is enabled, so these fail + // our linter without explicit disabling. + c1 := testClient(t, func(c *config.Config) { + c.Options = map[string]string{ + driver.ShutdownPeriodicAfter: "true", // nolint: varcheck + driver.ShutdownPeriodicDuration: "3", // nolint: varcheck + } + }) + defer c1.Shutdown() - // Change node attribute - node.Attributes["arch"] = "xyz_86" - if changed, newAttrHash, _ := c.hasNodeChanged(attrHash, metaHash); !changed { - t.Fatalf("Expected hash change in attributes: %d vs %d", attrHash, newAttrHash) - } + mockDriverName := "driver.mock_driver" - // Change node meta map - node.Meta["foo"] = "bar" - if changed, _, newMetaHash := c.hasNodeChanged(attrHash, metaHash); !changed { - t.Fatalf("Expected hash change in meta map: %d vs %d", metaHash, newMetaHash) - } + go c1.watchNodeUpdates() + c1.updateNode() + // This needs to be directly called as otherwise the client hangs on + // attempt to register with a server. S[ecifically, retryRegisterNode is + // blocking + + // test that the client's copy of the node is also updated + testutil.WaitForResult(func() (bool, error) { + mockDriverStatusCopy := c1.configCopy.Node.Attributes[mockDriverName] + if mockDriverStatusCopy == "" { + return false, fmt.Errorf("mock driver attribute should be set on the client") + } + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + // test that the client's copy of the node is also updated + testutil.WaitForResult(func() (bool, error) { + mockDriverStatusCopy := c1.configCopy.Node.Attributes[mockDriverName] + if mockDriverStatusCopy != "" { + return false, fmt.Errorf("mock driver attribute should not be set on the client") + } + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) } func TestClient_Fingerprint_Periodic(t *testing.T) { diff --git a/client/fingerprint_manager_test.go b/client/fingerprint_manager_test.go index fe37966e3e3..f2b57791e19 100644 --- a/client/fingerprint_manager_test.go +++ b/client/fingerprint_manager_test.go @@ -45,6 +45,39 @@ func TestFingerprintManager_Run_MockDriver(t *testing.T) { require.NotEqual("", node.Attributes["driver.mock_driver"]) } +func TestFingerprintManager_Run_ResourcesFingerprint(t *testing.T) { + driver.CheckForMockDriver(t) + t.Parallel() + require := require.New(t) + + node := &structs.Node{Resources: &structs.Resources{}} + + updateNode := func(r *cstructs.FingerprintResponse) *structs.Node { + if r.Resources != nil { + node.Resources.Merge(r.Resources) + } + return node + } + conf := config.DefaultConfig() + getConfig := func() *config.Config { + return conf + } + + fm := NewFingerprintManager( + getConfig, + node, + make(chan struct{}), + updateNode, + testLogger(), + ) + + err := fm.Run() + require.Nil(err) + require.NotEqual(0, node.Resources.CPU) + require.NotEqual(0, node.Resources.MemoryMB) + require.NotEqual(0, node.Resources.DiskMB) +} + func TestFingerprintManager_Fingerprint_Run(t *testing.T) { t.Parallel() require := require.New(t)