Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event stream MVP #9013

Merged
merged 19 commits into from
Oct 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## 0.13.0 (Unreleased)

FEATURES:

* **Event Stream**: Subscribe to change events as they occur in real time. [[GH-9013](https://github.com/hashicorp/nomad/issues/9013)]

IMPROVEMENTS:
* core: Improved job deregistration error logging. [[GH-8745](https://github.com/hashicorp/nomad/issues/8745)]
* api: Added support for cancellation contexts to HTTP API. [[GH-8836](https://github.com/hashicorp/nomad/issues/8836)]
Expand Down
98 changes: 98 additions & 0 deletions api/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package api

import (
"context"
"encoding/json"
"fmt"
)

// Events is a set of events for a corresponding index. Events returned for the
// index depend on which topics are subscribed to when a request is made.
type Events struct {
Index uint64
Events []Event
Err error
}

// Topic is an event Topic
type Topic string

// Event holds information related to an event that occurred in Nomad.
// The Payload is a hydrated object related to the Topic
type Event struct {
Topic Topic
Type string
Key string
FilterKeys []string
Index uint64
Payload map[string]interface{}
}

// IsHeartbeat specifies if the event is an empty heartbeat used to
// keep a connection alive.
func (e *Events) IsHeartbeat() bool {
return e.Index == 0 && len(e.Events) == 0
}

// EventStream is used to stream events from Nomad
type EventStream struct {
client *Client
}

// EventStream returns a handle to the Events endpoint
func (c *Client) EventStream() *EventStream {
return &EventStream{client: c}
}

// Stream establishes a new subscription to Nomad's event stream and streams
// results back to the returned channel.
func (e *EventStream) Stream(ctx context.Context, topics map[Topic][]string, index uint64, q *QueryOptions) (<-chan *Events, error) {
r, err := e.client.newRequest("GET", "/v1/event/stream")
if err != nil {
return nil, err
}
q = q.WithContext(ctx)
r.setQueryOptions(q)

// Build topic query params
for topic, keys := range topics {
for _, k := range keys {
r.params.Add("topic", fmt.Sprintf("%s:%s", topic, k))
}
}

_, resp, err := requireOK(e.client.doRequest(r))

if err != nil {
return nil, err
}

eventsCh := make(chan *Events, 10)
go func() {
defer resp.Body.Close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect we need to cancel the request as well (by setting the request ctx field) and consume what's left of the response body before closing the body. I don't recall http.Get semantics exactly now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've only seen us defer the closing of response body, I think if anything this would result in an event getting dropped after the user intended to cancel the stream, which seems ok?

defer close(eventsCh)

dec := json.NewDecoder(resp.Body)

for ctx.Err() == nil {
// Decode next newline delimited json of events
var events Events
if err := dec.Decode(&events); err != nil {
// set error and fallthrough to
// select eventsCh
events = Events{Err: err}
}
if events.Err == nil && events.IsHeartbeat() {
continue
}

select {
case <-ctx.Done():
return
case eventsCh <- &events:
}
}
}()

return eventsCh, nil
}
113 changes: 113 additions & 0 deletions api/event_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package api

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestEvent_Stream(t *testing.T) {
schmichael marked this conversation as resolved.
Show resolved Hide resolved
t.Parallel()

c, s := makeClient(t, nil, nil)
defer s.Stop()

// register job to generate events
jobs := c.Jobs()
job := testJob()
resp2, _, err := jobs.Register(job, nil)
require.Nil(t, err)
require.NotNil(t, resp2)

// build event stream request
events := c.EventStream()
q := &QueryOptions{}
topics := map[Topic][]string{
"Eval": {"*"},
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

streamCh, err := events.Stream(ctx, topics, 0, q)
require.NoError(t, err)

select {
case event := <-streamCh:
if event.Err != nil {
require.Fail(t, err.Error())
}
require.Equal(t, len(event.Events), 1)
require.Equal(t, "Eval", string(event.Events[0].Topic))
case <-time.After(5 * time.Second):
require.Fail(t, "failed waiting for event stream event")
}
}

func TestEvent_Stream_Err_InvalidQueryParam(t *testing.T) {
t.Parallel()

c, s := makeClient(t, nil, nil)
defer s.Stop()

// register job to generate events
jobs := c.Jobs()
job := testJob()
resp2, _, err := jobs.Register(job, nil)
require.Nil(t, err)
require.NotNil(t, resp2)

// build event stream request
events := c.EventStream()
q := &QueryOptions{}
topics := map[Topic][]string{
"Eval": {"::*"},
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

_, err = events.Stream(ctx, topics, 0, q)
require.Error(t, err)
drewbailey marked this conversation as resolved.
Show resolved Hide resolved
require.Contains(t, err.Error(), "400")
require.Contains(t, err.Error(), "Invalid key value pair")
}

func TestEvent_Stream_CloseCtx(t *testing.T) {
t.Parallel()

c, s := makeClient(t, nil, nil)
defer s.Stop()

// register job to generate events
jobs := c.Jobs()
job := testJob()
resp2, _, err := jobs.Register(job, nil)
require.Nil(t, err)
require.NotNil(t, resp2)

// build event stream request
events := c.EventStream()
q := &QueryOptions{}
topics := map[Topic][]string{
"Eval": {"*"},
}

ctx, cancel := context.WithCancel(context.Background())

streamCh, err := events.Stream(ctx, topics, 0, q)
require.NoError(t, err)

// cancel the request
cancel()

select {
case event, ok := <-streamCh:
require.False(t, ok)
require.Nil(t, event)
case <-time.After(5 * time.Second):
require.Fail(t, "failed waiting for event stream event")
}
}
9 changes: 9 additions & 0 deletions command/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,15 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) {
if agentConfig.Server.UpgradeVersion != "" {
conf.UpgradeVersion = agentConfig.Server.UpgradeVersion
}
if agentConfig.Server.EnableEventBroker != nil {
conf.EnableEventBroker = *agentConfig.Server.EnableEventBroker
}
if agentConfig.Server.EventBufferSize != nil {
conf.EventBufferSize = int64(*agentConfig.Server.EventBufferSize)
}
if agentConfig.Server.DurableEventCount != nil {
conf.DurableEventCount = int64(*agentConfig.Server.DurableEventCount)
}
if agentConfig.Autopilot != nil {
if agentConfig.Autopilot.CleanupDeadServers != nil {
conf.AutopilotConfig.CleanupDeadServers = *agentConfig.Autopilot.CleanupDeadServers
Expand Down
3 changes: 3 additions & 0 deletions command/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ func TestAgent_ServerConfig(t *testing.T) {
out, err := a.serverConfig()
require.NoError(t, err)

require.True(t, out.EnableEventBroker)
require.Equal(t, int64(100), out.DurableEventCount)

serfAddr := out.SerfConfig.MemberlistConfig.AdvertiseAddr
require.Equal(t, "127.0.0.1", serfAddr)

Expand Down
32 changes: 30 additions & 2 deletions command/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,19 @@ type ServerConfig struct {
// This value is ignored.
DefaultSchedulerConfig *structs.SchedulerConfiguration `hcl:"default_scheduler_config"`

// EnableEventBroker configures whether this server's state store
// will generate events for its event stream.
EnableEventBroker *bool `hcl:"enable_event_broker"`

// EventBufferSize configure the amount of events to be held in memory.
// If EnableEventBroker is set to true, the minimum allowable value
// for the EventBufferSize is 1.
EventBufferSize *int `hcl:"event_buffer_size"`

// DurableEventCount specifies the amount of events to persist during snapshot generation.
// A count of 0 signals that no events should be persisted.
DurableEventCount *int `hcl:"durable_event_count"`

// ExtraKeysHCL is used by hcl to surface unexpected keys
ExtraKeysHCL []string `hcl:",unusedKeys" json:"-"`
}
Expand Down Expand Up @@ -874,8 +887,11 @@ func DefaultConfig() *Config {
BindWildcardDefaultHostNetwork: true,
},
Server: &ServerConfig{
Enabled: false,
StartJoin: []string{},
Enabled: false,
EnableEventBroker: helper.BoolToPtr(true),
EventBufferSize: helper.IntToPtr(100),
DurableEventCount: helper.IntToPtr(100),
StartJoin: []string{},
ServerJoin: &ServerJoin{
RetryJoin: []string{},
RetryInterval: 30 * time.Second,
Expand Down Expand Up @@ -1399,6 +1415,18 @@ func (a *ServerConfig) Merge(b *ServerConfig) *ServerConfig {
result.ServerJoin = result.ServerJoin.Merge(b.ServerJoin)
}

if b.EnableEventBroker != nil {
result.EnableEventBroker = b.EnableEventBroker
}

if b.EventBufferSize != nil {
result.EventBufferSize = b.EventBufferSize
}

if b.DurableEventCount != nil {
result.DurableEventCount = b.DurableEventCount
}

if b.DefaultSchedulerConfig != nil {
c := *b.DefaultSchedulerConfig
result.DefaultSchedulerConfig = &c
Expand Down
3 changes: 3 additions & 0 deletions command/agent/config_parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ var basicConfig = &Config{
RedundancyZone: "foo",
UpgradeVersion: "0.8.0",
EncryptKey: "abc",
EnableEventBroker: helper.BoolToPtr(false),
EventBufferSize: helper.IntToPtr(200),
DurableEventCount: helper.IntToPtr(0),
ServerJoin: &ServerJoin{
RetryJoin: []string{"1.1.1.1", "2.2.2.2"},
RetryInterval: time.Duration(15) * time.Second,
Expand Down
61 changes: 61 additions & 0 deletions command/agent/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ func TestConfig_Merge(t *testing.T) {
MaxHeartbeatsPerSecond: 30.0,
RedundancyZone: "foo",
UpgradeVersion: "foo",
EnableEventBroker: helper.BoolToPtr(false),
EventBufferSize: helper.IntToPtr(0),
DurableEventCount: helper.IntToPtr(0),
},
ACL: &ACLConfig{
Enabled: true,
Expand Down Expand Up @@ -328,6 +331,9 @@ func TestConfig_Merge(t *testing.T) {
NonVotingServer: true,
RedundancyZone: "bar",
UpgradeVersion: "bar",
EnableEventBroker: helper.BoolToPtr(true),
DurableEventCount: helper.IntToPtr(100),
EventBufferSize: helper.IntToPtr(100),
},
ACL: &ACLConfig{
Enabled: true,
Expand Down Expand Up @@ -1163,3 +1169,58 @@ func TestTelemetry_Parse(t *testing.T) {
require.Exactly([]string{"+nomad.raft"}, config.Telemetry.PrefixFilter)
require.True(config.Telemetry.DisableDispatchedJobSummaryMetrics)
}

func TestEventBroker_Parse(t *testing.T) {

require := require.New(t)
{
a := &ServerConfig{
EnableEventBroker: helper.BoolToPtr(false),
EventBufferSize: helper.IntToPtr(0),
DurableEventCount: helper.IntToPtr(0),
}
b := DefaultConfig().Server
b.EnableEventBroker = nil
b.EventBufferSize = nil
b.DurableEventCount = nil

result := a.Merge(b)
require.Equal(false, *result.EnableEventBroker)
require.Equal(0, *result.EventBufferSize)
require.Equal(0, *result.DurableEventCount)
}

{
a := &ServerConfig{
EnableEventBroker: helper.BoolToPtr(true),
EventBufferSize: helper.IntToPtr(5000),
DurableEventCount: helper.IntToPtr(200),
}
b := DefaultConfig().Server
b.EnableEventBroker = nil
b.EventBufferSize = nil
b.DurableEventCount = nil

result := a.Merge(b)
require.Equal(true, *result.EnableEventBroker)
require.Equal(5000, *result.EventBufferSize)
require.Equal(200, *result.DurableEventCount)
}

{
a := &ServerConfig{
EnableEventBroker: helper.BoolToPtr(false),
EventBufferSize: helper.IntToPtr(0),
DurableEventCount: helper.IntToPtr(0),
}
b := DefaultConfig().Server
b.EnableEventBroker = helper.BoolToPtr(true)
b.EventBufferSize = helper.IntToPtr(20000)
b.DurableEventCount = helper.IntToPtr(1000)

result := a.Merge(b)
require.Equal(true, *result.EnableEventBroker)
require.Equal(20000, *result.EventBufferSize)
require.Equal(1000, *result.DurableEventCount)
}
}
Loading