Skip to content

Commit

Permalink
Event Stream API/RPC (#8947)
Browse files Browse the repository at this point in the history
This Commit adds an /v1/events/stream endpoint to stream events from.

The stream framer has been updated to include a SendFull method which
does not fragment the data between multiple frames. This essentially
treats the stream framer as a envelope to adhere to the stream framer
interface in the UI.

If the `encode` query parameter is omitted events will be streamed as
newline delimted JSON.
  • Loading branch information
drewbailey committed Oct 13, 2020
1 parent 4d491fe commit ff7fc0f
Show file tree
Hide file tree
Showing 18 changed files with 1,129 additions and 31 deletions.
169 changes: 169 additions & 0 deletions command/agent/event_endpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package agent

import (
"bytes"
"context"
"fmt"
"io"
"net"
"net/http"
"net/url"
"strconv"
"strings"

"github.com/docker/docker/pkg/ioutils"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs"
)

func (s *HTTPServer) EventStream(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
query := req.URL.Query()

indexStr := query.Get("index")
if indexStr == "" {
indexStr = "0"
}
index, err := strconv.Atoi(indexStr)
if err != nil {
return nil, CodedError(400, fmt.Sprintf("Unable to parse index: %v", err))
}

topics, err := parseEventTopics(query)
if err != nil {
return nil, CodedError(400, fmt.Sprintf("Invalid topic query: %v", err))
}

args := &structs.EventStreamRequest{
Topics: topics,
Index: index,
}
resp.Header().Set("Content-Type", "application/json")
resp.Header().Set("Cache-Control", "no-cache")

s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions)

// Make the RPC
var handler structs.StreamingRpcHandler
var handlerErr error

if server := s.agent.Server(); server != nil {
handler, handlerErr = server.StreamingRpcHandler("Event.Stream")
} else if client := s.agent.Client(); client != nil {
handler, handlerErr = client.RemoteStreamingRpcHandler("Event.Stream")
} else {
handlerErr = fmt.Errorf("misconfigured connection")
}

if handlerErr != nil {
return nil, CodedError(500, handlerErr.Error())
}

httpPipe, handlerPipe := net.Pipe()
decoder := codec.NewDecoder(httpPipe, structs.MsgpackHandle)
encoder := codec.NewEncoder(httpPipe, structs.MsgpackHandle)

// Create a goroutine that closes the pipe if the connection closes
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
go func() {
<-ctx.Done()
httpPipe.Close()
}()

// Create an output that gets flushed on every write
output := ioutils.NewWriteFlusher(resp)

// create an error channel to handle errors
errCh := make(chan HTTPCodedError, 2)

go func() {
defer cancel()

// Send the request
if err := encoder.Encode(args); err != nil {
errCh <- CodedError(500, err.Error())
return
}

for {
select {
case <-ctx.Done():
errCh <- nil
return
default:
}

// Decode the response
var res structs.EventStreamWrapper
if err := decoder.Decode(&res); err != nil {
if err == io.EOF || err == io.ErrClosedPipe {
return
}
errCh <- CodedError(500, err.Error())
return
}
decoder.Reset(httpPipe)

if err := res.Error; err != nil {
if err.Code != nil {
errCh <- CodedError(int(*err.Code), err.Error())
return
}
}

// Flush json entry to response
if _, err := io.Copy(output, bytes.NewReader(res.Event.Data)); err != nil {
errCh <- CodedError(500, err.Error())
return
}
}
}()

// invoke handler
handler(handlerPipe)
cancel()
codedErr := <-errCh

if codedErr != nil &&
(codedErr == io.EOF ||
strings.Contains(codedErr.Error(), io.ErrClosedPipe.Error())) {
codedErr = nil
}

return nil, codedErr
}

func parseEventTopics(query url.Values) (map[stream.Topic][]string, error) {
raw, ok := query["topic"]
if !ok {
return allTopics(), nil
}
topics := make(map[stream.Topic][]string)

for _, topic := range raw {
k, v, err := parseTopic(topic)
if err != nil {
return nil, fmt.Errorf("error parsing topics: %w", err)
}

if topics[stream.Topic(k)] == nil {
topics[stream.Topic(k)] = []string{v}
} else {
topics[stream.Topic(k)] = append(topics[stream.Topic(k)], v)
}
}
return topics, nil
}

func parseTopic(topic string) (string, string, error) {
parts := strings.Split(topic, ":")
if len(parts) != 2 {
return "", "", fmt.Errorf("Invalid key value pair for topic, topic: %s", topic)
}
return parts[0], parts[1], nil
}

func allTopics() map[stream.Topic][]string {
return map[stream.Topic][]string{"*": {"*"}}
}
148 changes: 148 additions & 0 deletions command/agent/event_endpoint_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package agent

import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"testing"
"time"

"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

type testEvent struct {
ID string
}

func TestEventStream(t *testing.T) {
t.Parallel()

httpTest(t, nil, func(s *TestAgent) {
ctx, cancel := context.WithCancel(context.Background())
req, err := http.NewRequestWithContext(ctx, "GET", "/v1/event/stream", nil)
require.Nil(t, err)
resp := httptest.NewRecorder()

respErrCh := make(chan error)
go func() {
_, err = s.Server.EventStream(resp, req)
respErrCh <- err
assert.NoError(t, err)
}()

pub, err := s.Agent.server.State().EventPublisher()
require.NoError(t, err)
pub.Publish(100, []stream.Event{{Payload: testEvent{ID: "123"}}})

testutil.WaitForResult(func() (bool, error) {
got := resp.Body.String()
want := `{"ID":"123"}`
if strings.Contains(got, want) {
return true, nil
}

return false, fmt.Errorf("missing expected json, got: %v, want: %v", got, want)
}, func(err error) {
cancel()
require.Fail(t, err.Error())
})

// wait for response to close to prevent race between subscription
// shutdown and server shutdown returning subscription closed by server err
// resp.Close()
cancel()
select {
case err := <-respErrCh:
require.Nil(t, err)
case <-time.After(1 * time.Second):
require.Fail(t, "waiting for request cancellation")
}
})
}

func TestEventStream_QueryParse(t *testing.T) {
t.Parallel()

cases := []struct {
desc string
query string
want map[stream.Topic][]string
wantErr bool
}{
{
desc: "all topics and keys specified",
query: "?topic=*:*",
want: map[stream.Topic][]string{
"*": []string{"*"},
},
},
{
desc: "all topics and keys inferred",
query: "",
want: map[stream.Topic][]string{
"*": []string{"*"},
},
},
{
desc: "invalid key value formatting",
query: "?topic=NodeDrain:*:*",
wantErr: true,
},
{
desc: "invalid key value formatting no value",
query: "?topic=NodeDrain",
wantErr: true,
},
{
desc: "single topic and key",
query: "?topic=NodeDrain:*",
want: map[stream.Topic][]string{
"NodeDrain": []string{"*"},
},
},
{
desc: "single topic multiple keys",
query: "?topic=NodeDrain:*&topic=NodeDrain:3caace09-f1f4-4d23-b37a-9ab5eb75069d",
want: map[stream.Topic][]string{
"NodeDrain": []string{
"*",
"3caace09-f1f4-4d23-b37a-9ab5eb75069d",
},
},
},
{
desc: "multiple topics",
query: "?topic=NodeRegister:*&topic=NodeDrain:3caace09-f1f4-4d23-b37a-9ab5eb75069d",
want: map[stream.Topic][]string{
"NodeDrain": []string{
"3caace09-f1f4-4d23-b37a-9ab5eb75069d",
},
"NodeRegister": []string{
"*",
},
},
},
}

for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
raw := fmt.Sprintf("http://localhost:80/v1/events%s", tc.query)
req, err := url.Parse(raw)
require.NoError(t, err)

got, err := parseEventTopics(req.Query())
if tc.wantErr {
require.Error(t, err)
return
}
require.NoError(t, err)
require.Equal(t, tc.want, got)
})
}
}
2 changes: 2 additions & 0 deletions command/agent/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,8 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {

s.mux.HandleFunc("/v1/operator/scheduler/configuration", s.wrap(s.OperatorSchedulerConfiguration))

s.mux.HandleFunc("/v1/event/stream", s.wrap(s.EventStream))

if uiEnabled {
s.mux.Handle("/ui/", http.StripPrefix("/ui/", s.handleUI(http.FileServer(&UIAssetWrapper{FileSystem: assetFS()}))))
} else {
Expand Down
5 changes: 5 additions & 0 deletions nomad/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ type Config struct {
// in the absence of ACLs
EnableDebug bool

// EnableEventPublisher is used to enable or disable the state stores
// event publishing
EnableEventPublisher bool

// LogOutput is the location to write logs to. If this is not set,
// logs will go to stderr.
LogOutput io.Writer
Expand Down Expand Up @@ -421,6 +425,7 @@ func DefaultConfig() *Config {
ReplicationBackoff: 30 * time.Second,
SentinelGCInterval: 30 * time.Second,
LicenseConfig: &LicenseConfig{},
EnableEventPublisher: true,
AutopilotConfig: &structs.AutopilotConfig{
CleanupDeadServers: true,
LastContactThreshold: 200 * time.Millisecond,
Expand Down
Loading

0 comments on commit ff7fc0f

Please sign in to comment.