Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes races accessing node and updating it during fingerprinting #4166

Merged
merged 7 commits into from
Apr 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 28 additions & 25 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,13 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
return nil, fmt.Errorf("node setup failed: %v", err)
}

fingerprintManager := NewFingerprintManager(c.GetConfig, c.config.Node,
// Store the config copy before restoring state but after it has been
// initialized.
c.configLock.Lock()
c.configCopy = c.config.Copy()
c.configLock.Unlock()

fingerprintManager := NewFingerprintManager(c.GetConfig, c.configCopy.Node,
c.shutdownCh, c.updateNodeFromFingerprint, c.updateNodeFromDriver,
c.logger)

Expand All @@ -271,12 +277,6 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
// Setup the reserved resources
c.reservePorts()

// Store the config copy before restoring state but after it has been
// initialized.
c.configLock.Lock()
c.configCopy = c.config.Copy()
c.configLock.Unlock()

// Set the preconfigured list of static servers
c.configLock.RLock()
if len(c.configCopy.Servers) > 0 {
Expand Down Expand Up @@ -437,7 +437,7 @@ func (c *Client) Leave() error {
func (c *Client) GetConfig() *config.Config {
c.configLock.Lock()
defer c.configLock.Unlock()
return c.config
return c.configCopy
}

// Datacenter returns the datacenter for the given client
Expand Down Expand Up @@ -726,7 +726,7 @@ func (c *Client) restoreState() error {
watcher := noopPrevAlloc{}

c.configLock.RLock()
ar := NewAllocRunner(c.logger, c.configCopy, c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService, watcher)
ar := NewAllocRunner(c.logger, c.configCopy.Copy(), c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService, watcher)
c.configLock.RUnlock()

c.allocLock.Lock()
Expand Down Expand Up @@ -963,6 +963,9 @@ func (c *Client) reservePorts() {
for _, net := range reservedIndex {
node.Reserved.Networks = append(node.Reserved.Networks, net)
}

// Make the changes available to the config copy.
c.configCopy = c.config.Copy()
}

// updateNodeFromFingerprint updates the node with the result of
Expand Down Expand Up @@ -1009,10 +1012,10 @@ func (c *Client) updateNodeFromFingerprint(response *cstructs.FingerprintRespons
}

if nodeHasChanged {
c.updateNode()
c.updateNodeLocked()
}

return c.config.Node
return c.configCopy.Node
}

// updateNodeFromDriver receives either a fingerprint of the driver or its
Expand Down Expand Up @@ -1104,10 +1107,10 @@ func (c *Client) updateNodeFromDriver(name string, fingerprint, health *structs.

if hasChanged {
c.config.Node.Drivers[name].UpdateTime = time.Now()
c.updateNode()
c.updateNodeLocked()
}

return c.config.Node
return c.configCopy.Node
}

// resourcesAreEqual is a temporary function to compare whether resources are
Expand Down Expand Up @@ -1752,9 +1755,14 @@ OUTER:
}
}

// updateNode triggers a client to update its node copy if it isn't doing
// so already
func (c *Client) updateNode() {
// updateNode updates the Node copy and triggers the client to send the updated
// Node to the server. This should be done while the caller holds the
// configLock lock.
func (c *Client) updateNodeLocked() {
// Update the config copy.
node := c.config.Node.Copy()
c.configCopy.Node = node

select {
case c.triggerNodeUpdate <- struct{}{}:
// Node update goroutine was released to execute
Expand All @@ -1774,15 +1782,7 @@ func (c *Client) watchNodeUpdates() {
select {
case <-timer.C:
c.logger.Printf("[DEBUG] client: state changed, updating node and re-registering.")

// Update the config copy.
c.configLock.Lock()
node := c.config.Node.Copy()
c.configCopy.Node = node
c.configLock.Unlock()

c.retryRegisterNode()

hasChanged = false
case <-c.triggerNodeUpdate:
if hasChanged {
Expand Down Expand Up @@ -1899,7 +1899,10 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error
c.configLock.RLock()
prevAlloc := newAllocWatcher(alloc, prevAR, c, c.configCopy, c.logger, migrateToken)

ar := NewAllocRunner(c.logger, c.configCopy, c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService, prevAlloc)
// Copy the config since the node can be swapped out as it is being updated.
// The long term fix is to pass in the config and node separately and then
// we don't have to do a copy.
ar := NewAllocRunner(c.logger, c.configCopy.Copy(), c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService, prevAlloc)
c.configLock.RUnlock()

// Store the alloc runner.
Expand Down
10 changes: 8 additions & 2 deletions client/fingerprint_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,16 @@ func NewFingerprintManager(getConfig func() *config.Config,
func (fm *FingerprintManager) setNode(node *structs.Node) {
fm.nodeLock.Lock()
defer fm.nodeLock.Unlock()

fm.node = node
}

// getNode returns the current client node
func (fm *FingerprintManager) getNode() *structs.Node {
fm.nodeLock.Lock()
defer fm.nodeLock.Unlock()
return fm.node
}

// Run starts the process of fingerprinting the node. It does an initial pass,
// identifying whitelisted and blacklisted fingerprints/drivers. Then, for
// those which require periotic checking, it starts a periodic process for
Expand Down Expand Up @@ -167,7 +173,7 @@ func (fm *FingerprintManager) setupFingerprinters(fingerprints []string) error {
// supported
func (fm *FingerprintManager) setupDrivers(drivers []string) error {
var availDrivers []string
driverCtx := driver.NewDriverContext("", "", fm.getConfig(), fm.node, fm.logger, nil)
driverCtx := driver.NewDriverContext("", "", fm.getConfig(), fm.getNode(), fm.logger, nil)
for _, name := range drivers {

d, err := driver.NewDriver(name, driverCtx)
Expand Down
13 changes: 13 additions & 0 deletions nomad/structs/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package structs

import (
"time"

"github.com/hashicorp/nomad/helper"
)

// DriverInfo is the current state of a single driver. This is updated
Expand All @@ -14,6 +16,17 @@ type DriverInfo struct {
UpdateTime time.Time
}

func (di *DriverInfo) Copy() *DriverInfo {
if di == nil {
return nil
}

cdi := new(DriverInfo)
*cdi = *di
cdi.Attributes = helper.CopyMapStringString(di.Attributes)
return cdi
}

// MergeHealthCheck merges information from a health check for a drier into a
// node's driver info
func (di *DriverInfo) MergeHealthCheck(other *DriverInfo) {
Expand Down
15 changes: 15 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1461,6 +1461,7 @@ func (n *Node) Copy() *Node {
nn.Meta = helper.CopyMapStringString(nn.Meta)
nn.Events = copyNodeEvents(n.Events)
nn.DrainStrategy = nn.DrainStrategy.Copy()
nn.Drivers = copyNodeDrivers(n.Drivers)
return nn
}

Expand All @@ -1478,6 +1479,20 @@ func copyNodeEvents(events []*NodeEvent) []*NodeEvent {
return c
}

// copyNodeDrivers is a helper to copy a map of DriverInfo
func copyNodeDrivers(drivers map[string]*DriverInfo) map[string]*DriverInfo {
l := len(drivers)
if l == 0 {
return nil
}

c := make(map[string]*DriverInfo, l)
for driver, info := range drivers {
c[driver] = info.Copy()
}
return c
}

// TerminalStatus returns if the current status is terminal and
// will no longer transition.
func (n *Node) TerminalStatus() bool {
Expand Down
77 changes: 77 additions & 0 deletions nomad/structs/structs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3676,3 +3676,80 @@ func TestNode_Canonicalize(t *testing.T) {
node.Canonicalize()
require.Equal(NodeSchedulingIneligible, node.SchedulingEligibility)
}

func TestNode_Copy(t *testing.T) {
t.Parallel()
require := require.New(t)

node := &Node{
ID: uuid.Generate(),
SecretID: uuid.Generate(),
Datacenter: "dc1",
Name: "foobar",
Attributes: map[string]string{
"kernel.name": "linux",
"arch": "x86",
"nomad.version": "0.5.0",
"driver.exec": "1",
"driver.mock_driver": "1",
},
Resources: &Resources{
CPU: 4000,
MemoryMB: 8192,
DiskMB: 100 * 1024,
IOPS: 150,
Networks: []*NetworkResource{
{
Device: "eth0",
CIDR: "192.168.0.100/32",
MBits: 1000,
},
},
},
Reserved: &Resources{
CPU: 100,
MemoryMB: 256,
DiskMB: 4 * 1024,
Networks: []*NetworkResource{
{
Device: "eth0",
IP: "192.168.0.100",
ReservedPorts: []Port{{Label: "ssh", Value: 22}},
MBits: 1,
},
},
},
Links: map[string]string{
"consul": "foobar.dc1",
},
Meta: map[string]string{
"pci-dss": "true",
"database": "mysql",
"version": "5.6",
},
NodeClass: "linux-medium-pci",
Status: NodeStatusReady,
SchedulingEligibility: NodeSchedulingEligible,
Drivers: map[string]*DriverInfo{
"mock_driver": &DriverInfo{
Attributes: map[string]string{"running": "1"},
Detected: true,
Healthy: true,
HealthDescription: "Currently active",
UpdateTime: time.Now(),
},
},
}
node.ComputeClass()

node2 := node.Copy()

require.Equal(node.Attributes, node2.Attributes)
require.Equal(node.Resources, node2.Resources)
require.Equal(node.Reserved, node2.Reserved)
require.Equal(node.Links, node2.Links)
require.Equal(node.Meta, node2.Meta)
require.Equal(node.Events, node2.Events)
require.Equal(node.DrainStrategy, node2.DrainStrategy)
require.Equal(node.Drivers, node2.Drivers)
}