Skip to content

Commit

Permalink
filter on additional filter keys, remove switch statement duplication
Browse files Browse the repository at this point in the history
properly wire up durable event count

move newline responsibility

moves newline creation from NDJson to the http handler, json stream only encodes and sends now

ignore snapshot restore if broker is disabled

enable dev mode to access event steam without acl

use mapping instead of switch

use pointers for config sizes, remove unused ttl, simplify closed conn logic
  • Loading branch information
drewbailey committed Oct 14, 2020
1 parent 7102820 commit 3c15f41
Show file tree
Hide file tree
Showing 38 changed files with 1,369 additions and 660 deletions.
4 changes: 2 additions & 2 deletions api/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ func (c *Client) EventStream() *EventStream {
// Stream establishes a new subscription to Nomad's event stream and streams
// results back to the returned channel.
func (e *EventStream) Stream(ctx context.Context, topics map[Topic][]string, index uint64, q *QueryOptions) (<-chan *Events, error) {

r, err := e.client.newRequest("GET", "/v1/event/stream")
if err != nil {
return nil, err
}
q = q.WithContext(ctx)
r.setQueryOptions(q)

// Build topic query params
Expand Down Expand Up @@ -82,7 +82,7 @@ func (e *EventStream) Stream(ctx context.Context, topics map[Topic][]string, ind
// select eventsCh
events = Events{Err: err}
}
if events.IsHeartbeat() {
if events.Err == nil && events.IsHeartbeat() {
continue
}

Expand Down
60 changes: 47 additions & 13 deletions api/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,15 @@ func TestEvent_Stream(t *testing.T) {
streamCh, err := events.Stream(ctx, topics, 0, q)
require.NoError(t, err)

OUTER:
for {
select {
case event := <-streamCh:
if event.Err != nil {
require.Fail(t, err.Error())
}
require.Equal(t, len(event.Events), 1)
require.Equal(t, "Eval", string(event.Events[0].Topic))

break OUTER
case <-time.After(5 * time.Second):
require.Fail(t, "failed waiting for event stream event")
select {
case event := <-streamCh:
if event.Err != nil {
require.Fail(t, err.Error())
}
require.Equal(t, len(event.Events), 1)
require.Equal(t, "Eval", string(event.Events[0].Topic))
case <-time.After(5 * time.Second):
require.Fail(t, "failed waiting for event stream event")
}
}

Expand Down Expand Up @@ -76,4 +71,43 @@ func TestEvent_Stream_Err_InvalidQueryParam(t *testing.T) {

_, err = events.Stream(ctx, topics, 0, q)
require.Error(t, err)
require.Contains(t, err.Error(), "400")
require.Contains(t, err.Error(), "Invalid key value pair")
}

func TestEvent_Stream_CloseCtx(t *testing.T) {
t.Parallel()

c, s := makeClient(t, nil, nil)
defer s.Stop()

// register job to generate events
jobs := c.Jobs()
job := testJob()
resp2, _, err := jobs.Register(job, nil)
require.Nil(t, err)
require.NotNil(t, resp2)

// build event stream request
events := c.EventStream()
q := &QueryOptions{}
topics := map[Topic][]string{
"Eval": {"*"},
}

ctx, cancel := context.WithCancel(context.Background())

streamCh, err := events.Stream(ctx, topics, 0, q)
require.NoError(t, err)

// cancel the request
cancel()

select {
case event, ok := <-streamCh:
require.False(t, ok)
require.Nil(t, event)
case <-time.After(5 * time.Second):
require.Fail(t, "failed waiting for event stream event")
}
}
11 changes: 7 additions & 4 deletions command/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,14 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) {
if agentConfig.Server.UpgradeVersion != "" {
conf.UpgradeVersion = agentConfig.Server.UpgradeVersion
}
if agentConfig.Server.EnableEventPublisher != nil {
conf.EnableEventPublisher = *agentConfig.Server.EnableEventPublisher
if agentConfig.Server.EnableEventBroker != nil {
conf.EnableEventBroker = *agentConfig.Server.EnableEventBroker
}
if agentConfig.Server.EventBufferSize > 0 {
conf.EventBufferSize = int64(agentConfig.Server.EventBufferSize)
if agentConfig.Server.EventBufferSize != nil {
conf.EventBufferSize = int64(*agentConfig.Server.EventBufferSize)
}
if agentConfig.Server.DurableEventCount != nil {
conf.DurableEventCount = int64(*agentConfig.Server.DurableEventCount)
}
if agentConfig.Autopilot != nil {
if agentConfig.Autopilot.CleanupDeadServers != nil {
Expand Down
3 changes: 2 additions & 1 deletion command/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ func TestAgent_ServerConfig(t *testing.T) {
out, err := a.serverConfig()
require.NoError(t, err)

require.True(t, out.EnableEventPublisher)
require.True(t, out.EnableEventBroker)
require.Equal(t, int64(100), out.DurableEventCount)

serfAddr := out.SerfConfig.MemberlistConfig.AdvertiseAddr
require.Equal(t, "127.0.0.1", serfAddr)
Expand Down
28 changes: 14 additions & 14 deletions command/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,18 +484,18 @@ type ServerConfig struct {
// This value is ignored.
DefaultSchedulerConfig *structs.SchedulerConfiguration `hcl:"default_scheduler_config"`

// EnableEventPublisher configures whether this server's state store
// EnableEventBroker configures whether this server's state store
// will generate events for its event stream.
EnableEventPublisher *bool `hcl:"enable_event_publisher"`
EnableEventBroker *bool `hcl:"enable_event_broker"`

// EventBufferSize configure the amount of events to be held in memory.
// If EnableEventPublisher is set to true, the minimum allowable value
// If EnableEventBroker is set to true, the minimum allowable value
// for the EventBufferSize is 1.
EventBufferSize int `hcl:"event_buffer_size"`
EventBufferSize *int `hcl:"event_buffer_size"`

// DurableEventCount specifies the amount of events to persist during snapshot generation.
// A count of 0 signals that no events should be persisted.
DurableEventCount int `hcl:"durable_event_count"`
DurableEventCount *int `hcl:"durable_event_count"`

// ExtraKeysHCL is used by hcl to surface unexpected keys
ExtraKeysHCL []string `hcl:",unusedKeys" json:"-"`
Expand Down Expand Up @@ -887,11 +887,11 @@ func DefaultConfig() *Config {
BindWildcardDefaultHostNetwork: true,
},
Server: &ServerConfig{
Enabled: false,
EnableEventPublisher: helper.BoolToPtr(true),
EventBufferSize: 100,
DurableEventCount: 100,
StartJoin: []string{},
Enabled: false,
EnableEventBroker: helper.BoolToPtr(true),
EventBufferSize: helper.IntToPtr(100),
DurableEventCount: helper.IntToPtr(100),
StartJoin: []string{},
ServerJoin: &ServerJoin{
RetryJoin: []string{},
RetryInterval: 30 * time.Second,
Expand Down Expand Up @@ -1415,15 +1415,15 @@ func (a *ServerConfig) Merge(b *ServerConfig) *ServerConfig {
result.ServerJoin = result.ServerJoin.Merge(b.ServerJoin)
}

if b.EnableEventPublisher != nil {
result.EnableEventPublisher = b.EnableEventPublisher
if b.EnableEventBroker != nil {
result.EnableEventBroker = b.EnableEventBroker
}

if b.EventBufferSize != 0 {
if b.EventBufferSize != nil {
result.EventBufferSize = b.EventBufferSize
}

if b.DurableEventCount != 0 {
if b.DurableEventCount != nil {
result.DurableEventCount = b.DurableEventCount
}

Expand Down
6 changes: 3 additions & 3 deletions command/agent/config_parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ var basicConfig = &Config{
RedundancyZone: "foo",
UpgradeVersion: "0.8.0",
EncryptKey: "abc",
EnableEventPublisher: helper.BoolToPtr(false),
EventBufferSize: 200,
DurableEventCount: 100,
EnableEventBroker: helper.BoolToPtr(false),
EventBufferSize: helper.IntToPtr(200),
DurableEventCount: helper.IntToPtr(0),
ServerJoin: &ServerJoin{
RetryJoin: []string{"1.1.1.1", "2.2.2.2"},
RetryInterval: time.Duration(15) * time.Second,
Expand Down
47 changes: 34 additions & 13 deletions command/agent/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ func TestConfig_Merge(t *testing.T) {
MaxHeartbeatsPerSecond: 30.0,
RedundancyZone: "foo",
UpgradeVersion: "foo",
EnableEventPublisher: helper.BoolToPtr(false),
EnableEventBroker: helper.BoolToPtr(false),
EventBufferSize: helper.IntToPtr(0),
DurableEventCount: helper.IntToPtr(0),
},
ACL: &ACLConfig{
Enabled: true,
Expand Down Expand Up @@ -329,7 +331,9 @@ func TestConfig_Merge(t *testing.T) {
NonVotingServer: true,
RedundancyZone: "bar",
UpgradeVersion: "bar",
EnableEventPublisher: helper.BoolToPtr(true),
EnableEventBroker: helper.BoolToPtr(true),
DurableEventCount: helper.IntToPtr(100),
EventBufferSize: helper.IntToPtr(100),
},
ACL: &ACLConfig{
Enabled: true,
Expand Down Expand Up @@ -1166,40 +1170,57 @@ func TestTelemetry_Parse(t *testing.T) {
require.True(config.Telemetry.DisableDispatchedJobSummaryMetrics)
}

func TestEventPublisher_Parse(t *testing.T) {
func TestEventBroker_Parse(t *testing.T) {

require := require.New(t)

{
a := &ServerConfig{
EnableEventPublisher: helper.BoolToPtr(false),
EnableEventBroker: helper.BoolToPtr(false),
EventBufferSize: helper.IntToPtr(0),
DurableEventCount: helper.IntToPtr(0),
}
b := DefaultConfig().Server
b.EnableEventPublisher = nil
b.EnableEventBroker = nil
b.EventBufferSize = nil
b.DurableEventCount = nil

result := a.Merge(b)
require.Equal(false, *result.EnableEventPublisher)
require.Equal(false, *result.EnableEventBroker)
require.Equal(0, *result.EventBufferSize)
require.Equal(0, *result.DurableEventCount)
}

{
a := &ServerConfig{
EnableEventPublisher: helper.BoolToPtr(true),
EnableEventBroker: helper.BoolToPtr(true),
EventBufferSize: helper.IntToPtr(5000),
DurableEventCount: helper.IntToPtr(200),
}
b := DefaultConfig().Server
b.EnableEventPublisher = nil
b.EnableEventBroker = nil
b.EventBufferSize = nil
b.DurableEventCount = nil

result := a.Merge(b)
require.Equal(true, *result.EnableEventPublisher)
require.Equal(true, *result.EnableEventBroker)
require.Equal(5000, *result.EventBufferSize)
require.Equal(200, *result.DurableEventCount)
}

{
a := &ServerConfig{
EnableEventPublisher: helper.BoolToPtr(false),
EnableEventBroker: helper.BoolToPtr(false),
EventBufferSize: helper.IntToPtr(0),
DurableEventCount: helper.IntToPtr(0),
}
b := DefaultConfig().Server
b.EnableEventPublisher = helper.BoolToPtr(true)
b.EnableEventBroker = helper.BoolToPtr(true)
b.EventBufferSize = helper.IntToPtr(20000)
b.DurableEventCount = helper.IntToPtr(1000)

result := a.Merge(b)
require.Equal(true, *result.EnableEventPublisher)
require.Equal(true, *result.EnableEventBroker)
require.Equal(20000, *result.EventBufferSize)
require.Equal(1000, *result.DurableEventCount)
}
}
45 changes: 18 additions & 27 deletions command/agent/event_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/docker/docker/pkg/ioutils"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/nomad/nomad/structs"
"golang.org/x/sync/errgroup"
)

func (s *HTTPServer) EventStream(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
Expand All @@ -40,12 +41,12 @@ func (s *HTTPServer) EventStream(resp http.ResponseWriter, req *http.Request) (i
resp.Header().Set("Content-Type", "application/json")
resp.Header().Set("Cache-Control", "no-cache")

// Set region, namespace and authtoken to args
s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions)

// Make the RPC
// Determine the RPC handler to use to find a server
var handler structs.StreamingRpcHandler
var handlerErr error

if server := s.agent.Server(); server != nil {
handler, handlerErr = server.StreamingRpcHandler("Event.Stream")
} else if client := s.agent.Client(); client != nil {
Expand Down Expand Up @@ -73,57 +74,51 @@ func (s *HTTPServer) EventStream(resp http.ResponseWriter, req *http.Request) (i
// Create an output that gets flushed on every write
output := ioutils.NewWriteFlusher(resp)

// create an error channel to handle errors
errCh := make(chan HTTPCodedError, 1)

go func() {
// send request and decode events
errs, errCtx := errgroup.WithContext(ctx)
errs.Go(func() error {
defer cancel()

// Send the request
if err := encoder.Encode(args); err != nil {
errCh <- CodedError(500, err.Error())
return
return CodedError(500, err.Error())
}

for {
select {
case <-ctx.Done():
errCh <- nil
return
case <-errCtx.Done():
return nil
default:
}

// Decode the response
var res structs.EventStreamWrapper
if err := decoder.Decode(&res); err != nil {
if err == io.EOF || err == io.ErrClosedPipe {
return
}
errCh <- CodedError(500, err.Error())
return
return CodedError(500, err.Error())
}
decoder.Reset(httpPipe)

if err := res.Error; err != nil {
if err.Code != nil {
errCh <- CodedError(int(*err.Code), err.Error())
return
return CodedError(int(*err.Code), err.Error())
}
}

// Flush json entry to response
if _, err := io.Copy(output, bytes.NewReader(res.Event.Data)); err != nil {
errCh <- CodedError(500, err.Error())
return
return CodedError(500, err.Error())
}
// Each entry is its own new line according to ndjson.org
// append new line to each entry
fmt.Fprint(output, "\n")
}
}()
})

// invoke handler
handler(handlerPipe)
cancel()
codedErr := <-errCh

codedErr := errs.Wait()
if codedErr != nil && strings.Contains(codedErr.Error(), io.ErrClosedPipe.Error()) {
codedErr = nil
}
Expand All @@ -144,11 +139,7 @@ func parseEventTopics(query url.Values) (map[structs.Topic][]string, error) {
return nil, fmt.Errorf("error parsing topics: %w", err)
}

if topics[structs.Topic(k)] == nil {
topics[structs.Topic(k)] = []string{v}
} else {
topics[structs.Topic(k)] = append(topics[structs.Topic(k)], v)
}
topics[structs.Topic(k)] = append(topics[structs.Topic(k)], v)
}
return topics, nil
}
Expand Down
Loading

0 comments on commit 3c15f41

Please sign in to comment.