-
Notifications
You must be signed in to change notification settings - Fork 2k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
This Commit adds an /v1/events/stream endpoint to stream events from. The stream framer has been updated to include a SendFull method which does not fragment the data between multiple frames. This essentially treats the stream framer as a envelope to adhere to the stream framer interface in the UI. If the `encode` query parameter is omitted events will be streamed as newline delimted JSON.
- Loading branch information
1 parent
76857ef
commit d748cb4
Showing
18 changed files
with
1,129 additions
and
31 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,169 @@ | ||
package agent | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"fmt" | ||
"io" | ||
"net" | ||
"net/http" | ||
"net/url" | ||
"strconv" | ||
"strings" | ||
|
||
"github.com/docker/docker/pkg/ioutils" | ||
"github.com/hashicorp/go-msgpack/codec" | ||
"github.com/hashicorp/nomad/nomad/stream" | ||
"github.com/hashicorp/nomad/nomad/structs" | ||
) | ||
|
||
func (s *HTTPServer) EventStream(resp http.ResponseWriter, req *http.Request) (interface{}, error) { | ||
query := req.URL.Query() | ||
|
||
indexStr := query.Get("index") | ||
if indexStr == "" { | ||
indexStr = "0" | ||
} | ||
index, err := strconv.Atoi(indexStr) | ||
if err != nil { | ||
return nil, CodedError(400, fmt.Sprintf("Unable to parse index: %v", err)) | ||
} | ||
|
||
topics, err := parseEventTopics(query) | ||
if err != nil { | ||
return nil, CodedError(400, fmt.Sprintf("Invalid topic query: %v", err)) | ||
} | ||
|
||
args := &structs.EventStreamRequest{ | ||
Topics: topics, | ||
Index: index, | ||
} | ||
resp.Header().Set("Content-Type", "application/json") | ||
resp.Header().Set("Cache-Control", "no-cache") | ||
|
||
s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions) | ||
|
||
// Make the RPC | ||
var handler structs.StreamingRpcHandler | ||
var handlerErr error | ||
|
||
if server := s.agent.Server(); server != nil { | ||
handler, handlerErr = server.StreamingRpcHandler("Event.Stream") | ||
} else if client := s.agent.Client(); client != nil { | ||
handler, handlerErr = client.RemoteStreamingRpcHandler("Event.Stream") | ||
} else { | ||
handlerErr = fmt.Errorf("misconfigured connection") | ||
} | ||
|
||
if handlerErr != nil { | ||
return nil, CodedError(500, handlerErr.Error()) | ||
} | ||
|
||
httpPipe, handlerPipe := net.Pipe() | ||
decoder := codec.NewDecoder(httpPipe, structs.MsgpackHandle) | ||
encoder := codec.NewEncoder(httpPipe, structs.MsgpackHandle) | ||
|
||
// Create a goroutine that closes the pipe if the connection closes | ||
ctx, cancel := context.WithCancel(req.Context()) | ||
defer cancel() | ||
go func() { | ||
<-ctx.Done() | ||
httpPipe.Close() | ||
}() | ||
|
||
// Create an output that gets flushed on every write | ||
output := ioutils.NewWriteFlusher(resp) | ||
|
||
// create an error channel to handle errors | ||
errCh := make(chan HTTPCodedError, 2) | ||
|
||
go func() { | ||
defer cancel() | ||
|
||
// Send the request | ||
if err := encoder.Encode(args); err != nil { | ||
errCh <- CodedError(500, err.Error()) | ||
return | ||
} | ||
|
||
for { | ||
select { | ||
case <-ctx.Done(): | ||
errCh <- nil | ||
return | ||
default: | ||
} | ||
|
||
// Decode the response | ||
var res structs.EventStreamWrapper | ||
if err := decoder.Decode(&res); err != nil { | ||
if err == io.EOF || err == io.ErrClosedPipe { | ||
return | ||
} | ||
errCh <- CodedError(500, err.Error()) | ||
return | ||
} | ||
decoder.Reset(httpPipe) | ||
|
||
if err := res.Error; err != nil { | ||
if err.Code != nil { | ||
errCh <- CodedError(int(*err.Code), err.Error()) | ||
return | ||
} | ||
} | ||
|
||
// Flush json entry to response | ||
if _, err := io.Copy(output, bytes.NewReader(res.Event.Data)); err != nil { | ||
errCh <- CodedError(500, err.Error()) | ||
return | ||
} | ||
} | ||
}() | ||
|
||
// invoke handler | ||
handler(handlerPipe) | ||
cancel() | ||
codedErr := <-errCh | ||
|
||
if codedErr != nil && | ||
(codedErr == io.EOF || | ||
strings.Contains(codedErr.Error(), io.ErrClosedPipe.Error())) { | ||
codedErr = nil | ||
} | ||
|
||
return nil, codedErr | ||
} | ||
|
||
func parseEventTopics(query url.Values) (map[stream.Topic][]string, error) { | ||
raw, ok := query["topic"] | ||
if !ok { | ||
return allTopics(), nil | ||
} | ||
topics := make(map[stream.Topic][]string) | ||
|
||
for _, topic := range raw { | ||
k, v, err := parseTopic(topic) | ||
if err != nil { | ||
return nil, fmt.Errorf("error parsing topics: %w", err) | ||
} | ||
|
||
if topics[stream.Topic(k)] == nil { | ||
topics[stream.Topic(k)] = []string{v} | ||
} else { | ||
topics[stream.Topic(k)] = append(topics[stream.Topic(k)], v) | ||
} | ||
} | ||
return topics, nil | ||
} | ||
|
||
func parseTopic(topic string) (string, string, error) { | ||
parts := strings.Split(topic, ":") | ||
if len(parts) != 2 { | ||
return "", "", fmt.Errorf("Invalid key value pair for topic, topic: %s", topic) | ||
} | ||
return parts[0], parts[1], nil | ||
} | ||
|
||
func allTopics() map[stream.Topic][]string { | ||
return map[stream.Topic][]string{"*": {"*"}} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,148 @@ | ||
package agent | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"net/http" | ||
"net/http/httptest" | ||
"net/url" | ||
"strings" | ||
"testing" | ||
"time" | ||
|
||
"github.com/hashicorp/nomad/nomad/stream" | ||
"github.com/hashicorp/nomad/testutil" | ||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
type testEvent struct { | ||
ID string | ||
} | ||
|
||
func TestEventStream(t *testing.T) { | ||
t.Parallel() | ||
|
||
httpTest(t, nil, func(s *TestAgent) { | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
req, err := http.NewRequestWithContext(ctx, "GET", "/v1/event/stream", nil) | ||
require.Nil(t, err) | ||
resp := httptest.NewRecorder() | ||
|
||
respErrCh := make(chan error) | ||
go func() { | ||
_, err = s.Server.EventStream(resp, req) | ||
respErrCh <- err | ||
assert.NoError(t, err) | ||
}() | ||
|
||
pub, err := s.Agent.server.State().EventPublisher() | ||
require.NoError(t, err) | ||
pub.Publish(100, []stream.Event{{Payload: testEvent{ID: "123"}}}) | ||
|
||
testutil.WaitForResult(func() (bool, error) { | ||
got := resp.Body.String() | ||
want := `{"ID":"123"}` | ||
if strings.Contains(got, want) { | ||
return true, nil | ||
} | ||
|
||
return false, fmt.Errorf("missing expected json, got: %v, want: %v", got, want) | ||
}, func(err error) { | ||
cancel() | ||
require.Fail(t, err.Error()) | ||
}) | ||
|
||
// wait for response to close to prevent race between subscription | ||
// shutdown and server shutdown returning subscription closed by server err | ||
// resp.Close() | ||
cancel() | ||
select { | ||
case err := <-respErrCh: | ||
require.Nil(t, err) | ||
case <-time.After(1 * time.Second): | ||
require.Fail(t, "waiting for request cancellation") | ||
} | ||
}) | ||
} | ||
|
||
func TestEventStream_QueryParse(t *testing.T) { | ||
t.Parallel() | ||
|
||
cases := []struct { | ||
desc string | ||
query string | ||
want map[stream.Topic][]string | ||
wantErr bool | ||
}{ | ||
{ | ||
desc: "all topics and keys specified", | ||
query: "?topic=*:*", | ||
want: map[stream.Topic][]string{ | ||
"*": []string{"*"}, | ||
}, | ||
}, | ||
{ | ||
desc: "all topics and keys inferred", | ||
query: "", | ||
want: map[stream.Topic][]string{ | ||
"*": []string{"*"}, | ||
}, | ||
}, | ||
{ | ||
desc: "invalid key value formatting", | ||
query: "?topic=NodeDrain:*:*", | ||
wantErr: true, | ||
}, | ||
{ | ||
desc: "invalid key value formatting no value", | ||
query: "?topic=NodeDrain", | ||
wantErr: true, | ||
}, | ||
{ | ||
desc: "single topic and key", | ||
query: "?topic=NodeDrain:*", | ||
want: map[stream.Topic][]string{ | ||
"NodeDrain": []string{"*"}, | ||
}, | ||
}, | ||
{ | ||
desc: "single topic multiple keys", | ||
query: "?topic=NodeDrain:*&topic=NodeDrain:3caace09-f1f4-4d23-b37a-9ab5eb75069d", | ||
want: map[stream.Topic][]string{ | ||
"NodeDrain": []string{ | ||
"*", | ||
"3caace09-f1f4-4d23-b37a-9ab5eb75069d", | ||
}, | ||
}, | ||
}, | ||
{ | ||
desc: "multiple topics", | ||
query: "?topic=NodeRegister:*&topic=NodeDrain:3caace09-f1f4-4d23-b37a-9ab5eb75069d", | ||
want: map[stream.Topic][]string{ | ||
"NodeDrain": []string{ | ||
"3caace09-f1f4-4d23-b37a-9ab5eb75069d", | ||
}, | ||
"NodeRegister": []string{ | ||
"*", | ||
}, | ||
}, | ||
}, | ||
} | ||
|
||
for _, tc := range cases { | ||
t.Run(tc.desc, func(t *testing.T) { | ||
raw := fmt.Sprintf("http://localhost:80/v1/events%s", tc.query) | ||
req, err := url.Parse(raw) | ||
require.NoError(t, err) | ||
|
||
got, err := parseEventTopics(req.Query()) | ||
if tc.wantErr { | ||
require.Error(t, err) | ||
return | ||
} | ||
require.NoError(t, err) | ||
require.Equal(t, tc.want, got) | ||
}) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.