Skip to content

Commit

Permalink
address pr feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
drewbailey committed Oct 7, 2020
1 parent 65bcca6 commit 9925e7d
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 61 deletions.
44 changes: 19 additions & 25 deletions api/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ import (
"fmt"
)

// Events is a set of events for a corresponding index. Events returned for the
// Ebvents is a set of events for a corresponding index. Events returned for the
// index depend on which topics are subscribed to when a request is made.
type Events struct {
Index uint64
Events []Event
Err error
}

// Topic is an event Topic
Expand All @@ -24,12 +25,12 @@ type Event struct {
Key string
FilterKeys []string
Index uint64
Payload interface{}
Payload map[string]interface{}
}

// IsHeartBeat specifies if the event is an empty heartbeat used to
// IsHeartbeat specifies if the event is an empty heartbeat used to
// keep a connection alive.
func (e *Events) IsHeartBeat() bool {
func (e *Events) IsHeartbeat() bool {
return e.Index == 0 && len(e.Events) == 0
}

Expand All @@ -45,14 +46,11 @@ func (c *Client) EventStream() *EventStream {

// Stream establishes a new subscription to Nomad's event stream and streams
// results back to the returned channel.
func (e *EventStream) Stream(ctx context.Context, topics map[Topic][]string, index uint64, q *QueryOptions) (<-chan *Events, <-chan error) {

errCh := make(chan error, 1)
func (e *EventStream) Stream(ctx context.Context, topics map[Topic][]string, index uint64, q *QueryOptions) (<-chan *Events, error) {

r, err := e.client.newRequest("GET", "/v1/event/stream")
if err != nil {
errCh <- err
return nil, errCh
return nil, err
}
r.setQueryOptions(q)

Expand All @@ -66,39 +64,35 @@ func (e *EventStream) Stream(ctx context.Context, topics map[Topic][]string, ind
_, resp, err := requireOK(e.client.doRequest(r))

if err != nil {
errCh <- err
return nil, errCh
return nil, err
}

eventsCh := make(chan *Events, 10)
go func() {
defer resp.Body.Close()
defer close(eventsCh)

dec := json.NewDecoder(resp.Body)

for {
select {
case <-ctx.Done():
close(eventsCh)
return
default:
}

for ctx.Err() == nil {
// Decode next newline delimited json of events
var events Events
if err := dec.Decode(&events); err != nil {
close(eventsCh)
errCh <- err
events = Events{Err: err}
eventsCh <- &events
return
}
if events.IsHeartBeat() {
if events.IsHeartbeat() {
continue
}

eventsCh <- &events

select {
case <-ctx.Done():
return
case eventsCh <- &events:
}
}
}()

return eventsCh, errCh
return eventsCh, nil
}
35 changes: 32 additions & 3 deletions api/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,49 @@ func TestEvent_Stream(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

streamCh, errCh := events.Stream(ctx, topics, 0, q)
streamCh, err := events.Stream(ctx, topics, 0, q)
require.NoError(t, err)

OUTER:
for {
select {
case event := <-streamCh:
if event.Err != nil {
require.Fail(t, err.Error())
}
require.Equal(t, len(event.Events), 1)
require.Equal(t, "Eval", string(event.Events[0].Topic))

break OUTER
case err := <-errCh:
require.Fail(t, err.Error())
case <-time.After(5 * time.Second):
require.Fail(t, "failed waiting for event stream event")
}
}
}

func TestEvent_Stream_Err_InvalidQueryParam(t *testing.T) {
t.Parallel()

c, s := makeClient(t, nil, nil)
defer s.Stop()

// register job to generate events
jobs := c.Jobs()
job := testJob()
resp2, _, err := jobs.Register(job, nil)
require.Nil(t, err)
require.NotNil(t, resp2)

// build event stream request
events := c.EventStream()
q := &QueryOptions{}
topics := map[Topic][]string{
"Eval": {"::*"},
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

_, err = events.Stream(ctx, topics, 0, q)
require.Error(t, err)
}
4 changes: 4 additions & 0 deletions command/agent/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,7 @@ func (c *Command) AutocompleteFlags() complete.Flags {
"-vault-tls-server-name": complete.PredictAnything,
"-acl-enabled": complete.PredictNothing,
"-acl-replication-token": complete.PredictAnything,
"-event-publisher": complete.PredictNothing,
}
}

Expand Down Expand Up @@ -1280,6 +1281,9 @@ Server Options:
-rejoin
Ignore a previous leave and attempts to rejoin the cluster.
-event-publisher
Whether to enable or disable the servers event publisher.
Client Options:
-client
Expand Down
5 changes: 4 additions & 1 deletion command/agent/event_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,10 @@ func parseEventTopics(query url.Values) (map[structs.Topic][]string, error) {

func parseTopic(topic string) (string, string, error) {
parts := strings.Split(topic, ":")
if len(parts) != 2 {
// infer wildcard if only given a topic
if len(parts) == 1 {
return topic, "*", nil
} else if len(parts) != 2 {
return "", "", fmt.Errorf("Invalid key value pair for topic, topic: %s", topic)
}
return parts[0], parts[1], nil
Expand Down
10 changes: 7 additions & 3 deletions command/agent/event_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package agent
import (
"context"
"fmt"
"github.com/hashicorp/nomad/nomad/structs"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"testing"
"time"

"github.com/hashicorp/nomad/nomad/structs"

"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -95,9 +96,12 @@ func TestEventStream_QueryParse(t *testing.T) {
wantErr: true,
},
{
desc: "invalid key value formatting no value",
desc: "Infer wildcard if absent",
query: "?topic=NodeDrain",
wantErr: true,
wantErr: false,
want: map[structs.Topic][]string{
"NodeDrain": {"*"},
},
},
{
desc: "single topic and key",
Expand Down
6 changes: 2 additions & 4 deletions nomad/event_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,23 +97,21 @@ func (e *Event) stream(conn io.ReadWriteCloser) {
return
}
select {
case <-errCh:
case <-ctx.Done():
return
}
}()

go func() {
defer cancel()
LOOP:
for {
events, err := subscription.Next(ctx)
if err != nil {
select {
case errCh <- err:
case <-ctx.Done():
}
break LOOP
return
}

// Continue if there are no events
Expand All @@ -126,7 +124,7 @@ func (e *Event) stream(conn io.ReadWriteCloser) {
case errCh <- err:
case <-ctx.Done():
}
break LOOP
return
}
}
}()
Expand Down
44 changes: 19 additions & 25 deletions vendor/github.com/hashicorp/nomad/api/event.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 9925e7d

Please sign in to comment.