diff --git a/CHANGELOG.md b/CHANGELOG.md index c6973f2884a..684984b91d9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,8 @@ __BACKWARDS INCOMPATIBILITIES:__ IMPROVEMENTS: * core: Allow upgrading/downgrading TLS via SIGHUP on both servers and clients [[GH-3492](https://github.com/hashicorp/nomad/issues/3492)] + * core: Node events are emitted for events such as node registration and + heartbeating [[GH-3945](https://github.com/hashicorp/nomad/issues/3945)] * core: A set of features (Autopilot) has been added to allow for automatic operator-friendly management of Nomad servers. For more information about Autopilot, see the [Autopilot Guide](https://www.nomadproject.io/guides/cluster/autopilot.html). [[GH-3670](https://github.com/hashicorp/nomad/pull/3670)] * core: Servers can now service client HTTP endpoints [[GH-3892](https://github.com/hashicorp/nomad/issues/3892)] * cli: Node status and filesystem related commands do not require direct diff --git a/client/client.go b/client/client.go index 705efb524e7..c3565e8d232 100644 --- a/client/client.go +++ b/client/client.go @@ -81,6 +81,10 @@ const ( // allocSyncRetryIntv is the interval on which we retry updating // the status of the allocation allocSyncRetryIntv = 5 * time.Second + + // nodeEventsEmitIntv is the interval at which node events are synced with + // the server + nodeEventsEmitIntv = 3 * time.Second ) // ClientStatsReporter exposes all the APIs related to resource usage of a Nomad @@ -133,6 +137,10 @@ type Client struct { // update it. triggerNodeUpdate chan struct{} + // triggerEmitNodeEvent sends an event and triggers the client to update the + // server for the node event + triggerEmitNodeEvent chan *structs.NodeEvent + // discovered will be ticked whenever Consul discovery completes // successfully serversDiscoveredCh chan struct{} @@ -200,20 +208,21 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic // Create the client c := &Client{ - config: cfg, - consulCatalog: consulCatalog, - consulService: consulService, - start: time.Now(), - connPool: pool.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, tlsWrap), - tlsWrap: tlsWrap, - streamingRpcs: structs.NewStreamingRpcRegistery(), - logger: logger, - allocs: make(map[string]*AllocRunner), - allocUpdates: make(chan *structs.Allocation, 64), - shutdownCh: make(chan struct{}), - triggerDiscoveryCh: make(chan struct{}), - triggerNodeUpdate: make(chan struct{}, 8), - serversDiscoveredCh: make(chan struct{}), + config: cfg, + consulCatalog: consulCatalog, + consulService: consulService, + start: time.Now(), + connPool: pool.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, tlsWrap), + tlsWrap: tlsWrap, + streamingRpcs: structs.NewStreamingRpcRegistery(), + logger: logger, + allocs: make(map[string]*AllocRunner), + allocUpdates: make(chan *structs.Allocation, 64), + shutdownCh: make(chan struct{}), + triggerDiscoveryCh: make(chan struct{}), + triggerNodeUpdate: make(chan struct{}, 8), + triggerEmitNodeEvent: make(chan *structs.NodeEvent, 8), + serversDiscoveredCh: make(chan struct{}), } // Initialize the server manager @@ -1052,6 +1061,9 @@ func (c *Client) registerAndHeartbeat() { // Start watching changes for node changes go c.watchNodeUpdates() + // Start watching for emitting node events + go c.watchEmitEvents() + // Setup the heartbeat timer, for the initial registration // we want to do this quickly. We want to do it extra quickly // in development mode. @@ -1085,6 +1097,14 @@ func (c *Client) registerAndHeartbeat() { // if heartbeating fails, trigger Consul discovery c.triggerDiscovery() + + // trigger a node event to register that the heartbeat failed + nodeEvent := &structs.NodeEvent{ + Message: fmt.Sprintf("Client heartbeat failed at %s", intv), + Subsystem: "Heartbeat", + Timestamp: time.Now().Unix(), + } + c.triggerNodeEvent(nodeEvent) } } else { c.heartbeatLock.Lock() @@ -1131,6 +1151,74 @@ func (c *Client) run() { } } +// submitNodeEvents is used to submit a client-side node event. Examples of +// these kinds of events include when a driver moves from healthy to unhealhty +// (and vice versa) +func (c *Client) submitNodeEvents(e []*structs.NodeEvent) error { + node := c.Node() + nodeEvents := map[string][]*structs.NodeEvent{ + node.ID: e, + } + req := structs.EmitNodeEventRequest{ + NodeEvents: nodeEvents, + WriteRequest: structs.WriteRequest{Region: c.Region()}, + } + var resp structs.EmitNodeEventResponse + if err := c.RPC("Node.EmitEvent", &req, &resp); err != nil { + c.logger.Printf("[ERR] client: emitting node events failed %v", err) + return err + } + c.logger.Printf("[INFO] client: emit node events complete") + return nil +} + +// emitEvent is a handler which receives node events and on a interval and +// submits them in batch format to the server +func (c *Client) watchEmitEvents() { + batchEventsLock := sync.Mutex{} + batchEvents := make([]*structs.NodeEvent, 0) + + timer := time.NewTimer(c.retryIntv(nodeEventsEmitIntv)) + defer timer.Stop() + + for { + select { + case event := <-c.triggerEmitNodeEvent: + batchEventsLock.Lock() + batchEvents = append(batchEvents, event) + batchEventsLock.Unlock() + + case <-timer.C: + timer.Reset(c.retryIntv(nodeUpdateRetryIntv)) + + batchEventsLock.Lock() + if len(batchEvents) == 0 { + // if we haven't received any events to emit, continue until the next + // time interval + batchEventsLock.Unlock() + continue + } + + c.submitNodeEvents(batchEvents) + batchEventsLock.Unlock() + + case <-c.shutdownCh: + return + default: + } + } +} + +// emitEvent triggers a emit node event +func (c *Client) triggerNodeEvent(nodeEvent *structs.NodeEvent) { + select { + case c.triggerEmitNodeEvent <- nodeEvent: + // emit node event goroutine was released to execute + default: + // emit node event goroutine was already running + } +} + // retryRegisterNode is used to register the node or update the registration and // retry in case of failure. func (c *Client) retryRegisterNode() {