diff --git a/api/event.go b/api/event.go index b52f4eb8e8b..dce3c265fe8 100644 --- a/api/event.go +++ b/api/event.go @@ -6,11 +6,12 @@ import ( "fmt" ) -// Events is a set of events for a corresponding index. Events returned for the +// Ebvents is a set of events for a corresponding index. Events returned for the // index depend on which topics are subscribed to when a request is made. type Events struct { Index uint64 Events []Event + Err error } // Topic is an event Topic @@ -24,12 +25,12 @@ type Event struct { Key string FilterKeys []string Index uint64 - Payload interface{} + Payload map[string]interface{} } -// IsHeartBeat specifies if the event is an empty heartbeat used to +// IsHeartbeat specifies if the event is an empty heartbeat used to // keep a connection alive. -func (e *Events) IsHeartBeat() bool { +func (e *Events) IsHeartbeat() bool { return e.Index == 0 && len(e.Events) == 0 } @@ -45,14 +46,11 @@ func (c *Client) EventStream() *EventStream { // Stream establishes a new subscription to Nomad's event stream and streams // results back to the returned channel. -func (e *EventStream) Stream(ctx context.Context, topics map[Topic][]string, index uint64, q *QueryOptions) (<-chan *Events, <-chan error) { - - errCh := make(chan error, 1) +func (e *EventStream) Stream(ctx context.Context, topics map[Topic][]string, index uint64, q *QueryOptions) (<-chan *Events, error) { r, err := e.client.newRequest("GET", "/v1/event/stream") if err != nil { - errCh <- err - return nil, errCh + return nil, err } r.setQueryOptions(q) @@ -66,39 +64,35 @@ func (e *EventStream) Stream(ctx context.Context, topics map[Topic][]string, ind _, resp, err := requireOK(e.client.doRequest(r)) if err != nil { - errCh <- err - return nil, errCh + return nil, err } eventsCh := make(chan *Events, 10) go func() { defer resp.Body.Close() + defer close(eventsCh) dec := json.NewDecoder(resp.Body) - for { - select { - case <-ctx.Done(): - close(eventsCh) - return - default: - } - + for ctx.Err() == nil { // Decode next newline delimited json of events var events Events if err := dec.Decode(&events); err != nil { - close(eventsCh) - errCh <- err + events = Events{Err: err} + eventsCh <- &events return } - if events.IsHeartBeat() { + if events.IsHeartbeat() { continue } - eventsCh <- &events - + select { + case <-ctx.Done(): + return + case eventsCh <- &events: + } } }() - return eventsCh, errCh + return eventsCh, nil } diff --git a/api/event_test.go b/api/event_test.go index 1ea1c6e4274..9bd4ae3a4dc 100644 --- a/api/event_test.go +++ b/api/event_test.go @@ -31,20 +31,49 @@ func TestEvent_Stream(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - streamCh, errCh := events.Stream(ctx, topics, 0, q) + streamCh, err := events.Stream(ctx, topics, 0, q) + require.NoError(t, err) OUTER: for { select { case event := <-streamCh: + if event.Err != nil { + require.Fail(t, err.Error()) + } require.Equal(t, len(event.Events), 1) require.Equal(t, "Eval", string(event.Events[0].Topic)) break OUTER - case err := <-errCh: - require.Fail(t, err.Error()) case <-time.After(5 * time.Second): require.Fail(t, "failed waiting for event stream event") } } } + +func TestEvent_Stream_Err_InvalidQueryParam(t *testing.T) { + t.Parallel() + + c, s := makeClient(t, nil, nil) + defer s.Stop() + + // register job to generate events + jobs := c.Jobs() + job := testJob() + resp2, _, err := jobs.Register(job, nil) + require.Nil(t, err) + require.NotNil(t, resp2) + + // build event stream request + events := c.EventStream() + q := &QueryOptions{} + topics := map[Topic][]string{ + "Eval": {"::*"}, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _, err = events.Stream(ctx, topics, 0, q) + require.Error(t, err) +} diff --git a/command/agent/command.go b/command/agent/command.go index 35ed0a9ff57..f924249a2f7 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -597,6 +597,7 @@ func (c *Command) AutocompleteFlags() complete.Flags { "-vault-tls-server-name": complete.PredictAnything, "-acl-enabled": complete.PredictNothing, "-acl-replication-token": complete.PredictAnything, + "-event-publisher": complete.PredictNothing, } } @@ -1280,6 +1281,9 @@ Server Options: -rejoin Ignore a previous leave and attempts to rejoin the cluster. + -event-publisher + Whether to enable or disable the servers event publisher. + Client Options: -client diff --git a/command/agent/event_endpoint.go b/command/agent/event_endpoint.go index d6856396913..828fbb01ba1 100644 --- a/command/agent/event_endpoint.go +++ b/command/agent/event_endpoint.go @@ -157,7 +157,10 @@ func parseEventTopics(query url.Values) (map[structs.Topic][]string, error) { func parseTopic(topic string) (string, string, error) { parts := strings.Split(topic, ":") - if len(parts) != 2 { + // infer wildcard if only given a topic + if len(parts) == 1 { + return topic, "*", nil + } else if len(parts) != 2 { return "", "", fmt.Errorf("Invalid key value pair for topic, topic: %s", topic) } return parts[0], parts[1], nil diff --git a/command/agent/event_endpoint_test.go b/command/agent/event_endpoint_test.go index af86cce7cad..915029efc50 100644 --- a/command/agent/event_endpoint_test.go +++ b/command/agent/event_endpoint_test.go @@ -3,7 +3,6 @@ package agent import ( "context" "fmt" - "github.com/hashicorp/nomad/nomad/structs" "net/http" "net/http/httptest" "net/url" @@ -11,6 +10,8 @@ import ( "testing" "time" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -95,9 +96,12 @@ func TestEventStream_QueryParse(t *testing.T) { wantErr: true, }, { - desc: "invalid key value formatting no value", + desc: "Infer wildcard if absent", query: "?topic=NodeDrain", - wantErr: true, + wantErr: false, + want: map[structs.Topic][]string{ + "NodeDrain": {"*"}, + }, }, { desc: "single topic and key", diff --git a/nomad/event_endpoint.go b/nomad/event_endpoint.go index 1e1c94081f6..6c84d3783b6 100644 --- a/nomad/event_endpoint.go +++ b/nomad/event_endpoint.go @@ -97,7 +97,6 @@ func (e *Event) stream(conn io.ReadWriteCloser) { return } select { - case <-errCh: case <-ctx.Done(): return } @@ -105,7 +104,6 @@ func (e *Event) stream(conn io.ReadWriteCloser) { go func() { defer cancel() - LOOP: for { events, err := subscription.Next(ctx) if err != nil { @@ -113,7 +111,7 @@ func (e *Event) stream(conn io.ReadWriteCloser) { case errCh <- err: case <-ctx.Done(): } - break LOOP + return } // Continue if there are no events @@ -126,7 +124,7 @@ func (e *Event) stream(conn io.ReadWriteCloser) { case errCh <- err: case <-ctx.Done(): } - break LOOP + return } } }() diff --git a/vendor/github.com/hashicorp/nomad/api/event.go b/vendor/github.com/hashicorp/nomad/api/event.go index b52f4eb8e8b..dce3c265fe8 100644 --- a/vendor/github.com/hashicorp/nomad/api/event.go +++ b/vendor/github.com/hashicorp/nomad/api/event.go @@ -6,11 +6,12 @@ import ( "fmt" ) -// Events is a set of events for a corresponding index. Events returned for the +// Ebvents is a set of events for a corresponding index. Events returned for the // index depend on which topics are subscribed to when a request is made. type Events struct { Index uint64 Events []Event + Err error } // Topic is an event Topic @@ -24,12 +25,12 @@ type Event struct { Key string FilterKeys []string Index uint64 - Payload interface{} + Payload map[string]interface{} } -// IsHeartBeat specifies if the event is an empty heartbeat used to +// IsHeartbeat specifies if the event is an empty heartbeat used to // keep a connection alive. -func (e *Events) IsHeartBeat() bool { +func (e *Events) IsHeartbeat() bool { return e.Index == 0 && len(e.Events) == 0 } @@ -45,14 +46,11 @@ func (c *Client) EventStream() *EventStream { // Stream establishes a new subscription to Nomad's event stream and streams // results back to the returned channel. -func (e *EventStream) Stream(ctx context.Context, topics map[Topic][]string, index uint64, q *QueryOptions) (<-chan *Events, <-chan error) { - - errCh := make(chan error, 1) +func (e *EventStream) Stream(ctx context.Context, topics map[Topic][]string, index uint64, q *QueryOptions) (<-chan *Events, error) { r, err := e.client.newRequest("GET", "/v1/event/stream") if err != nil { - errCh <- err - return nil, errCh + return nil, err } r.setQueryOptions(q) @@ -66,39 +64,35 @@ func (e *EventStream) Stream(ctx context.Context, topics map[Topic][]string, ind _, resp, err := requireOK(e.client.doRequest(r)) if err != nil { - errCh <- err - return nil, errCh + return nil, err } eventsCh := make(chan *Events, 10) go func() { defer resp.Body.Close() + defer close(eventsCh) dec := json.NewDecoder(resp.Body) - for { - select { - case <-ctx.Done(): - close(eventsCh) - return - default: - } - + for ctx.Err() == nil { // Decode next newline delimited json of events var events Events if err := dec.Decode(&events); err != nil { - close(eventsCh) - errCh <- err + events = Events{Err: err} + eventsCh <- &events return } - if events.IsHeartBeat() { + if events.IsHeartbeat() { continue } - eventsCh <- &events - + select { + case <-ctx.Done(): + return + case eventsCh <- &events: + } } }() - return eventsCh, errCh + return eventsCh, nil }