Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Events/api #8947

Merged
merged 1 commit into from
Sep 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 169 additions & 0 deletions command/agent/event_endpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package agent

import (
"bytes"
"context"
"fmt"
"io"
"net"
"net/http"
"net/url"
"strconv"
"strings"

"github.com/docker/docker/pkg/ioutils"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs"
)

func (s *HTTPServer) EventStream(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
query := req.URL.Query()

indexStr := query.Get("index")
if indexStr == "" {
indexStr = "0"
}
index, err := strconv.Atoi(indexStr)
if err != nil {
return nil, CodedError(400, fmt.Sprintf("Unable to parse index: %v", err))
}

topics, err := parseEventTopics(query)
if err != nil {
return nil, CodedError(400, fmt.Sprintf("Invalid topic query: %v", err))
}

args := &structs.EventStreamRequest{
Topics: topics,
Index: index,
}
resp.Header().Set("Content-Type", "application/json")
resp.Header().Set("Cache-Control", "no-cache")

s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions)

// Make the RPC
var handler structs.StreamingRpcHandler
var handlerErr error

if server := s.agent.Server(); server != nil {
handler, handlerErr = server.StreamingRpcHandler("Event.Stream")
} else if client := s.agent.Client(); client != nil {
handler, handlerErr = client.RemoteStreamingRpcHandler("Event.Stream")
} else {
handlerErr = fmt.Errorf("misconfigured connection")
}

if handlerErr != nil {
return nil, CodedError(500, handlerErr.Error())
}

httpPipe, handlerPipe := net.Pipe()
decoder := codec.NewDecoder(httpPipe, structs.MsgpackHandle)
encoder := codec.NewEncoder(httpPipe, structs.MsgpackHandle)

// Create a goroutine that closes the pipe if the connection closes
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
go func() {
<-ctx.Done()
httpPipe.Close()
}()

// Create an output that gets flushed on every write
output := ioutils.NewWriteFlusher(resp)

// create an error channel to handle errors
errCh := make(chan HTTPCodedError, 2)

go func() {
defer cancel()

// Send the request
if err := encoder.Encode(args); err != nil {
errCh <- CodedError(500, err.Error())
return
}

for {
select {
case <-ctx.Done():
errCh <- nil
return
default:
}

// Decode the response
var res structs.EventStreamWrapper
if err := decoder.Decode(&res); err != nil {
if err == io.EOF || err == io.ErrClosedPipe {
return
}
errCh <- CodedError(500, err.Error())
return
}
decoder.Reset(httpPipe)

if err := res.Error; err != nil {
if err.Code != nil {
errCh <- CodedError(int(*err.Code), err.Error())
return
}
}

// Flush json entry to response
if _, err := io.Copy(output, bytes.NewReader(res.Event.Data)); err != nil {
errCh <- CodedError(500, err.Error())
return
}
}
}()

// invoke handler
handler(handlerPipe)
cancel()
codedErr := <-errCh

if codedErr != nil &&
(codedErr == io.EOF ||
strings.Contains(codedErr.Error(), io.ErrClosedPipe.Error())) {
codedErr = nil
}

return nil, codedErr
}

func parseEventTopics(query url.Values) (map[stream.Topic][]string, error) {
raw, ok := query["topic"]
if !ok {
return allTopics(), nil
}
topics := make(map[stream.Topic][]string)

for _, topic := range raw {
k, v, err := parseTopic(topic)
if err != nil {
return nil, fmt.Errorf("error parsing topics: %w", err)
}

if topics[stream.Topic(k)] == nil {
topics[stream.Topic(k)] = []string{v}
} else {
topics[stream.Topic(k)] = append(topics[stream.Topic(k)], v)
}
}
return topics, nil
}

func parseTopic(topic string) (string, string, error) {
parts := strings.Split(topic, ":")
if len(parts) != 2 {
return "", "", fmt.Errorf("Invalid key value pair for topic, topic: %s", topic)
}
return parts[0], parts[1], nil
}

func allTopics() map[stream.Topic][]string {
return map[stream.Topic][]string{"*": {"*"}}
}
148 changes: 148 additions & 0 deletions command/agent/event_endpoint_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package agent

import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"testing"
"time"

"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

type testEvent struct {
ID string
}

func TestEventStream(t *testing.T) {
t.Parallel()

httpTest(t, nil, func(s *TestAgent) {
ctx, cancel := context.WithCancel(context.Background())
req, err := http.NewRequestWithContext(ctx, "GET", "/v1/event/stream", nil)
require.Nil(t, err)
resp := httptest.NewRecorder()

respErrCh := make(chan error)
go func() {
_, err = s.Server.EventStream(resp, req)
respErrCh <- err
assert.NoError(t, err)
}()

pub, err := s.Agent.server.State().EventPublisher()
require.NoError(t, err)
pub.Publish(100, []stream.Event{{Payload: testEvent{ID: "123"}}})

testutil.WaitForResult(func() (bool, error) {
got := resp.Body.String()
want := `{"ID":"123"}`
if strings.Contains(got, want) {
return true, nil
}

return false, fmt.Errorf("missing expected json, got: %v, want: %v", got, want)
}, func(err error) {
cancel()
require.Fail(t, err.Error())
})

// wait for response to close to prevent race between subscription
// shutdown and server shutdown returning subscription closed by server err
// resp.Close()
cancel()
select {
case err := <-respErrCh:
require.Nil(t, err)
case <-time.After(1 * time.Second):
require.Fail(t, "waiting for request cancellation")
}
})
}

func TestEventStream_QueryParse(t *testing.T) {
t.Parallel()

cases := []struct {
desc string
query string
want map[stream.Topic][]string
wantErr bool
}{
{
desc: "all topics and keys specified",
query: "?topic=*:*",
want: map[stream.Topic][]string{
"*": []string{"*"},
},
},
{
desc: "all topics and keys inferred",
query: "",
want: map[stream.Topic][]string{
"*": []string{"*"},
},
},
{
desc: "invalid key value formatting",
query: "?topic=NodeDrain:*:*",
wantErr: true,
},
{
desc: "invalid key value formatting no value",
query: "?topic=NodeDrain",
wantErr: true,
},
{
desc: "single topic and key",
query: "?topic=NodeDrain:*",
want: map[stream.Topic][]string{
"NodeDrain": []string{"*"},
},
},
{
desc: "single topic multiple keys",
query: "?topic=NodeDrain:*&topic=NodeDrain:3caace09-f1f4-4d23-b37a-9ab5eb75069d",
want: map[stream.Topic][]string{
"NodeDrain": []string{
"*",
"3caace09-f1f4-4d23-b37a-9ab5eb75069d",
},
},
},
{
desc: "multiple topics",
query: "?topic=NodeRegister:*&topic=NodeDrain:3caace09-f1f4-4d23-b37a-9ab5eb75069d",
want: map[stream.Topic][]string{
"NodeDrain": []string{
"3caace09-f1f4-4d23-b37a-9ab5eb75069d",
},
"NodeRegister": []string{
"*",
},
},
},
}

for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
raw := fmt.Sprintf("http://localhost:80/v1/events%s", tc.query)
req, err := url.Parse(raw)
require.NoError(t, err)

got, err := parseEventTopics(req.Query())
if tc.wantErr {
require.Error(t, err)
return
}
require.NoError(t, err)
require.Equal(t, tc.want, got)
})
}
}
2 changes: 2 additions & 0 deletions command/agent/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,8 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {

s.mux.HandleFunc("/v1/operator/scheduler/configuration", s.wrap(s.OperatorSchedulerConfiguration))

s.mux.HandleFunc("/v1/event/stream", s.wrap(s.EventStream))

if uiEnabled {
s.mux.Handle("/ui/", http.StripPrefix("/ui/", s.handleUI(http.FileServer(&UIAssetWrapper{FileSystem: assetFS()}))))
} else {
Expand Down
5 changes: 5 additions & 0 deletions nomad/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ type Config struct {
// in the absence of ACLs
EnableDebug bool

// EnableEventPublisher is used to enable or disable the state stores
// event publishing
EnableEventPublisher bool

// LogOutput is the location to write logs to. If this is not set,
// logs will go to stderr.
LogOutput io.Writer
Expand Down Expand Up @@ -421,6 +425,7 @@ func DefaultConfig() *Config {
ReplicationBackoff: 30 * time.Second,
SentinelGCInterval: 30 * time.Second,
LicenseConfig: &LicenseConfig{},
EnableEventPublisher: true,
AutopilotConfig: &structs.AutopilotConfig{
CleanupDeadServers: true,
LastContactThreshold: 200 * time.Millisecond,
Expand Down
Loading