Skip to content

Commit

Permalink
Merge pull request #9013 from hashicorp/event-stream
Browse files Browse the repository at this point in the history
Event stream
  • Loading branch information
drewbailey authored Oct 14, 2020
2 parents 349a62d + 8c5f3a0 commit d68e3d4
Show file tree
Hide file tree
Showing 66 changed files with 5,729 additions and 344 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## 0.13.0 (Unreleased)

FEATURES:

* **Event Stream**: Subscribe to change events as they occur in real time. [[GH-9013](https://github.com/hashicorp/nomad/issues/9013)]

IMPROVEMENTS:
* core: Improved job deregistration error logging. [[GH-8745](https://github.com/hashicorp/nomad/issues/8745)]
* api: Added support for cancellation contexts to HTTP API. [[GH-8836](https://github.com/hashicorp/nomad/issues/8836)]
Expand Down
98 changes: 98 additions & 0 deletions api/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package api

import (
"context"
"encoding/json"
"fmt"
)

// Events is a set of events for a corresponding index. Events returned for the
// index depend on which topics are subscribed to when a request is made.
type Events struct {
Index uint64
Events []Event
Err error
}

// Topic is an event Topic
type Topic string

// Event holds information related to an event that occurred in Nomad.
// The Payload is a hydrated object related to the Topic
type Event struct {
Topic Topic
Type string
Key string
FilterKeys []string
Index uint64
Payload map[string]interface{}
}

// IsHeartbeat specifies if the event is an empty heartbeat used to
// keep a connection alive.
func (e *Events) IsHeartbeat() bool {
return e.Index == 0 && len(e.Events) == 0
}

// EventStream is used to stream events from Nomad
type EventStream struct {
client *Client
}

// EventStream returns a handle to the Events endpoint
func (c *Client) EventStream() *EventStream {
return &EventStream{client: c}
}

// Stream establishes a new subscription to Nomad's event stream and streams
// results back to the returned channel.
func (e *EventStream) Stream(ctx context.Context, topics map[Topic][]string, index uint64, q *QueryOptions) (<-chan *Events, error) {
r, err := e.client.newRequest("GET", "/v1/event/stream")
if err != nil {
return nil, err
}
q = q.WithContext(ctx)
r.setQueryOptions(q)

// Build topic query params
for topic, keys := range topics {
for _, k := range keys {
r.params.Add("topic", fmt.Sprintf("%s:%s", topic, k))
}
}

_, resp, err := requireOK(e.client.doRequest(r))

if err != nil {
return nil, err
}

eventsCh := make(chan *Events, 10)
go func() {
defer resp.Body.Close()
defer close(eventsCh)

dec := json.NewDecoder(resp.Body)

for ctx.Err() == nil {
// Decode next newline delimited json of events
var events Events
if err := dec.Decode(&events); err != nil {
// set error and fallthrough to
// select eventsCh
events = Events{Err: err}
}
if events.Err == nil && events.IsHeartbeat() {
continue
}

select {
case <-ctx.Done():
return
case eventsCh <- &events:
}
}
}()

return eventsCh, nil
}
113 changes: 113 additions & 0 deletions api/event_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package api

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestEvent_Stream(t *testing.T) {
t.Parallel()

c, s := makeClient(t, nil, nil)
defer s.Stop()

// register job to generate events
jobs := c.Jobs()
job := testJob()
resp2, _, err := jobs.Register(job, nil)
require.Nil(t, err)
require.NotNil(t, resp2)

// build event stream request
events := c.EventStream()
q := &QueryOptions{}
topics := map[Topic][]string{
"Eval": {"*"},
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

streamCh, err := events.Stream(ctx, topics, 0, q)
require.NoError(t, err)

select {
case event := <-streamCh:
if event.Err != nil {
require.Fail(t, err.Error())
}
require.Equal(t, len(event.Events), 1)
require.Equal(t, "Eval", string(event.Events[0].Topic))
case <-time.After(5 * time.Second):
require.Fail(t, "failed waiting for event stream event")
}
}

func TestEvent_Stream_Err_InvalidQueryParam(t *testing.T) {
t.Parallel()

c, s := makeClient(t, nil, nil)
defer s.Stop()

// register job to generate events
jobs := c.Jobs()
job := testJob()
resp2, _, err := jobs.Register(job, nil)
require.Nil(t, err)
require.NotNil(t, resp2)

// build event stream request
events := c.EventStream()
q := &QueryOptions{}
topics := map[Topic][]string{
"Eval": {"::*"},
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

_, err = events.Stream(ctx, topics, 0, q)
require.Error(t, err)
require.Contains(t, err.Error(), "400")
require.Contains(t, err.Error(), "Invalid key value pair")
}

func TestEvent_Stream_CloseCtx(t *testing.T) {
t.Parallel()

c, s := makeClient(t, nil, nil)
defer s.Stop()

// register job to generate events
jobs := c.Jobs()
job := testJob()
resp2, _, err := jobs.Register(job, nil)
require.Nil(t, err)
require.NotNil(t, resp2)

// build event stream request
events := c.EventStream()
q := &QueryOptions{}
topics := map[Topic][]string{
"Eval": {"*"},
}

ctx, cancel := context.WithCancel(context.Background())

streamCh, err := events.Stream(ctx, topics, 0, q)
require.NoError(t, err)

// cancel the request
cancel()

select {
case event, ok := <-streamCh:
require.False(t, ok)
require.Nil(t, event)
case <-time.After(5 * time.Second):
require.Fail(t, "failed waiting for event stream event")
}
}
9 changes: 9 additions & 0 deletions command/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,15 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) {
if agentConfig.Server.UpgradeVersion != "" {
conf.UpgradeVersion = agentConfig.Server.UpgradeVersion
}
if agentConfig.Server.EnableEventBroker != nil {
conf.EnableEventBroker = *agentConfig.Server.EnableEventBroker
}
if agentConfig.Server.EventBufferSize != nil {
conf.EventBufferSize = int64(*agentConfig.Server.EventBufferSize)
}
if agentConfig.Server.DurableEventCount != nil {
conf.DurableEventCount = int64(*agentConfig.Server.DurableEventCount)
}
if agentConfig.Autopilot != nil {
if agentConfig.Autopilot.CleanupDeadServers != nil {
conf.AutopilotConfig.CleanupDeadServers = *agentConfig.Autopilot.CleanupDeadServers
Expand Down
3 changes: 3 additions & 0 deletions command/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ func TestAgent_ServerConfig(t *testing.T) {
out, err := a.serverConfig()
require.NoError(t, err)

require.True(t, out.EnableEventBroker)
require.Equal(t, int64(100), out.DurableEventCount)

serfAddr := out.SerfConfig.MemberlistConfig.AdvertiseAddr
require.Equal(t, "127.0.0.1", serfAddr)

Expand Down
32 changes: 30 additions & 2 deletions command/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,19 @@ type ServerConfig struct {
// This value is ignored.
DefaultSchedulerConfig *structs.SchedulerConfiguration `hcl:"default_scheduler_config"`

// EnableEventBroker configures whether this server's state store
// will generate events for its event stream.
EnableEventBroker *bool `hcl:"enable_event_broker"`

// EventBufferSize configure the amount of events to be held in memory.
// If EnableEventBroker is set to true, the minimum allowable value
// for the EventBufferSize is 1.
EventBufferSize *int `hcl:"event_buffer_size"`

// DurableEventCount specifies the amount of events to persist during snapshot generation.
// A count of 0 signals that no events should be persisted.
DurableEventCount *int `hcl:"durable_event_count"`

// ExtraKeysHCL is used by hcl to surface unexpected keys
ExtraKeysHCL []string `hcl:",unusedKeys" json:"-"`
}
Expand Down Expand Up @@ -874,8 +887,11 @@ func DefaultConfig() *Config {
BindWildcardDefaultHostNetwork: true,
},
Server: &ServerConfig{
Enabled: false,
StartJoin: []string{},
Enabled: false,
EnableEventBroker: helper.BoolToPtr(true),
EventBufferSize: helper.IntToPtr(100),
DurableEventCount: helper.IntToPtr(100),
StartJoin: []string{},
ServerJoin: &ServerJoin{
RetryJoin: []string{},
RetryInterval: 30 * time.Second,
Expand Down Expand Up @@ -1399,6 +1415,18 @@ func (a *ServerConfig) Merge(b *ServerConfig) *ServerConfig {
result.ServerJoin = result.ServerJoin.Merge(b.ServerJoin)
}

if b.EnableEventBroker != nil {
result.EnableEventBroker = b.EnableEventBroker
}

if b.EventBufferSize != nil {
result.EventBufferSize = b.EventBufferSize
}

if b.DurableEventCount != nil {
result.DurableEventCount = b.DurableEventCount
}

if b.DefaultSchedulerConfig != nil {
c := *b.DefaultSchedulerConfig
result.DefaultSchedulerConfig = &c
Expand Down
3 changes: 3 additions & 0 deletions command/agent/config_parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ var basicConfig = &Config{
RedundancyZone: "foo",
UpgradeVersion: "0.8.0",
EncryptKey: "abc",
EnableEventBroker: helper.BoolToPtr(false),
EventBufferSize: helper.IntToPtr(200),
DurableEventCount: helper.IntToPtr(0),
ServerJoin: &ServerJoin{
RetryJoin: []string{"1.1.1.1", "2.2.2.2"},
RetryInterval: time.Duration(15) * time.Second,
Expand Down
61 changes: 61 additions & 0 deletions command/agent/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ func TestConfig_Merge(t *testing.T) {
MaxHeartbeatsPerSecond: 30.0,
RedundancyZone: "foo",
UpgradeVersion: "foo",
EnableEventBroker: helper.BoolToPtr(false),
EventBufferSize: helper.IntToPtr(0),
DurableEventCount: helper.IntToPtr(0),
},
ACL: &ACLConfig{
Enabled: true,
Expand Down Expand Up @@ -328,6 +331,9 @@ func TestConfig_Merge(t *testing.T) {
NonVotingServer: true,
RedundancyZone: "bar",
UpgradeVersion: "bar",
EnableEventBroker: helper.BoolToPtr(true),
DurableEventCount: helper.IntToPtr(100),
EventBufferSize: helper.IntToPtr(100),
},
ACL: &ACLConfig{
Enabled: true,
Expand Down Expand Up @@ -1163,3 +1169,58 @@ func TestTelemetry_Parse(t *testing.T) {
require.Exactly([]string{"+nomad.raft"}, config.Telemetry.PrefixFilter)
require.True(config.Telemetry.DisableDispatchedJobSummaryMetrics)
}

func TestEventBroker_Parse(t *testing.T) {

require := require.New(t)
{
a := &ServerConfig{
EnableEventBroker: helper.BoolToPtr(false),
EventBufferSize: helper.IntToPtr(0),
DurableEventCount: helper.IntToPtr(0),
}
b := DefaultConfig().Server
b.EnableEventBroker = nil
b.EventBufferSize = nil
b.DurableEventCount = nil

result := a.Merge(b)
require.Equal(false, *result.EnableEventBroker)
require.Equal(0, *result.EventBufferSize)
require.Equal(0, *result.DurableEventCount)
}

{
a := &ServerConfig{
EnableEventBroker: helper.BoolToPtr(true),
EventBufferSize: helper.IntToPtr(5000),
DurableEventCount: helper.IntToPtr(200),
}
b := DefaultConfig().Server
b.EnableEventBroker = nil
b.EventBufferSize = nil
b.DurableEventCount = nil

result := a.Merge(b)
require.Equal(true, *result.EnableEventBroker)
require.Equal(5000, *result.EventBufferSize)
require.Equal(200, *result.DurableEventCount)
}

{
a := &ServerConfig{
EnableEventBroker: helper.BoolToPtr(false),
EventBufferSize: helper.IntToPtr(0),
DurableEventCount: helper.IntToPtr(0),
}
b := DefaultConfig().Server
b.EnableEventBroker = helper.BoolToPtr(true)
b.EventBufferSize = helper.IntToPtr(20000)
b.DurableEventCount = helper.IntToPtr(1000)

result := a.Merge(b)
require.Equal(true, *result.EnableEventBroker)
require.Equal(20000, *result.EventBufferSize)
require.Equal(1000, *result.DurableEventCount)
}
}
Loading

0 comments on commit d68e3d4

Please sign in to comment.