Skip to content

Commit

Permalink
add client side emitting of node events
Browse files Browse the repository at this point in the history
Changelog
  • Loading branch information
chelseakomlo committed Mar 9, 2018
1 parent 2676fc9 commit f93ea2a
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 14 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ __BACKWARDS INCOMPATIBILITIES:__

IMPROVEMENTS:
* core: Allow upgrading/downgrading TLS via SIGHUP on both servers and clients [[GH-3492](https://github.com/hashicorp/nomad/issues/3492)]
* core: Node events are emitted for events such as node registration and
heartbeating [[GH-3945](https://github.com/hashicorp/nomad/issues/3945)]
* core: A set of features (Autopilot) has been added to allow for automatic operator-friendly management of Nomad servers. For more information about Autopilot, see the [Autopilot Guide](https://www.nomadproject.io/guides/cluster/autopilot.html). [[GH-3670](https://github.com/hashicorp/nomad/pull/3670)]
* core: Servers can now service client HTTP endpoints [[GH-3892](https://github.com/hashicorp/nomad/issues/3892)]
* cli: Node status and filesystem related commands do not require direct
Expand Down
116 changes: 102 additions & 14 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ const (
// allocSyncRetryIntv is the interval on which we retry updating
// the status of the allocation
allocSyncRetryIntv = 5 * time.Second

// nodeEventsEmitIntv is the interval at which node events are synced with
// the server
nodeEventsEmitIntv = 3 * time.Second
)

// ClientStatsReporter exposes all the APIs related to resource usage of a Nomad
Expand Down Expand Up @@ -133,6 +137,10 @@ type Client struct {
// update it.
triggerNodeUpdate chan struct{}

// triggerEmitNodeEvent sends an event and triggers the client to update the
// server for the node event
triggerEmitNodeEvent chan *structs.NodeEvent

// discovered will be ticked whenever Consul discovery completes
// successfully
serversDiscoveredCh chan struct{}
Expand Down Expand Up @@ -200,20 +208,21 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic

// Create the client
c := &Client{
config: cfg,
consulCatalog: consulCatalog,
consulService: consulService,
start: time.Now(),
connPool: pool.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, tlsWrap),
tlsWrap: tlsWrap,
streamingRpcs: structs.NewStreamingRpcRegistery(),
logger: logger,
allocs: make(map[string]*AllocRunner),
allocUpdates: make(chan *structs.Allocation, 64),
shutdownCh: make(chan struct{}),
triggerDiscoveryCh: make(chan struct{}),
triggerNodeUpdate: make(chan struct{}, 8),
serversDiscoveredCh: make(chan struct{}),
config: cfg,
consulCatalog: consulCatalog,
consulService: consulService,
start: time.Now(),
connPool: pool.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, tlsWrap),
tlsWrap: tlsWrap,
streamingRpcs: structs.NewStreamingRpcRegistery(),
logger: logger,
allocs: make(map[string]*AllocRunner),
allocUpdates: make(chan *structs.Allocation, 64),
shutdownCh: make(chan struct{}),
triggerDiscoveryCh: make(chan struct{}),
triggerNodeUpdate: make(chan struct{}, 8),
triggerEmitNodeEvent: make(chan *structs.NodeEvent, 8),
serversDiscoveredCh: make(chan struct{}),
}

// Initialize the server manager
Expand Down Expand Up @@ -1052,6 +1061,9 @@ func (c *Client) registerAndHeartbeat() {
// Start watching changes for node changes
go c.watchNodeUpdates()

// Start watching for emitting node events
go c.watchEmitEvents()

// Setup the heartbeat timer, for the initial registration
// we want to do this quickly. We want to do it extra quickly
// in development mode.
Expand Down Expand Up @@ -1085,6 +1097,14 @@ func (c *Client) registerAndHeartbeat() {

// if heartbeating fails, trigger Consul discovery
c.triggerDiscovery()

// trigger a node event to register that the heartbeat failed
nodeEvent := &structs.NodeEvent{
Message: fmt.Sprintf("Client heartbeat failed at %s", intv),
Subsystem: "Heartbeat",
Timestamp: time.Now().Unix(),
}
c.triggerNodeEvent(nodeEvent)
}
} else {
c.heartbeatLock.Lock()
Expand Down Expand Up @@ -1131,6 +1151,74 @@ func (c *Client) run() {
}
}

// submitNodeEvents is used to submit a client-side node event. Examples of
// these kinds of events include when a driver moves from healthy to unhealhty
// (and vice versa)
func (c *Client) submitNodeEvents(e []*structs.NodeEvent) error {
node := c.Node()
nodeEvents := map[string][]*structs.NodeEvent{
node.ID: e,
}
req := structs.EmitNodeEventRequest{
NodeEvents: nodeEvents,
WriteRequest: structs.WriteRequest{Region: c.Region()},
}
var resp structs.EmitNodeEventResponse
if err := c.RPC("Node.EmitEvent", &req, &resp); err != nil {
c.logger.Printf("[ERR] client: emitting node events failed %v", err)
return err
}
c.logger.Printf("[INFO] client: emit node events complete")
return nil
}

// emitEvent is a handler which receives node events and on a interval and
// submits them in batch format to the server
func (c *Client) watchEmitEvents() {
batchEventsLock := sync.Mutex{}
batchEvents := make([]*structs.NodeEvent, 0)

timer := time.NewTimer(c.retryIntv(nodeEventsEmitIntv))
defer timer.Stop()

for {
select {
case event := <-c.triggerEmitNodeEvent:
batchEventsLock.Lock()
batchEvents = append(batchEvents, event)
batchEventsLock.Unlock()

case <-timer.C:
timer.Reset(c.retryIntv(nodeUpdateRetryIntv))

batchEventsLock.Lock()
if len(batchEvents) == 0 {
// if we haven't received any events to emit, continue until the next
// time interval
batchEventsLock.Unlock()
continue
}

c.submitNodeEvents(batchEvents)
batchEventsLock.Unlock()

case <-c.shutdownCh:
return
default:
}
}
}

// emitEvent triggers a emit node event
func (c *Client) triggerNodeEvent(nodeEvent *structs.NodeEvent) {
select {
case c.triggerEmitNodeEvent <- nodeEvent:
// emit node event goroutine was released to execute
default:
// emit node event goroutine was already running
}
}

// retryRegisterNode is used to register the node or update the registration and
// retry in case of failure.
func (c *Client) retryRegisterNode() {
Expand Down

0 comments on commit f93ea2a

Please sign in to comment.