From d7b63a915d94d1d0b863834d71807ff1e4c5bde9 Mon Sep 17 00:00:00 2001 From: Drew Bailey <2614075+drewbailey@users.noreply.github.com> Date: Mon, 28 Sep 2020 10:13:10 -0400 Subject: [PATCH] Event Stream API/RPC (#8947) This Commit adds an /v1/events/stream endpoint to stream events from. The stream framer has been updated to include a SendFull method which does not fragment the data between multiple frames. This essentially treats the stream framer as a envelope to adhere to the stream framer interface in the UI. If the `encode` query parameter is omitted events will be streamed as newline delimted JSON. --- command/agent/event_endpoint.go | 169 +++++++++++++++ command/agent/event_endpoint_test.go | 148 ++++++++++++++ command/agent/http.go | 2 + nomad/config.go | 5 + nomad/event_endpoint.go | 213 +++++++++++++++++++ nomad/event_endpoint_test.go | 295 +++++++++++++++++++++++++++ nomad/fsm.go | 8 +- nomad/server.go | 15 +- nomad/state/state_changes.go | 16 +- nomad/state/state_store.go | 13 +- nomad/stream/event_buffer.go | 2 +- nomad/stream/event_publisher.go | 13 +- nomad/stream/ndjson.go | 114 +++++++++++ nomad/stream/ndjson_test.go | 72 +++++++ nomad/stream/subscription.go | 26 ++- nomad/stream/subscription_test.go | 16 ++ nomad/structs/structs.go | 32 +++ nomad/testing.go | 1 + 18 files changed, 1129 insertions(+), 31 deletions(-) create mode 100644 command/agent/event_endpoint.go create mode 100644 command/agent/event_endpoint_test.go create mode 100644 nomad/event_endpoint.go create mode 100644 nomad/event_endpoint_test.go create mode 100644 nomad/stream/ndjson.go create mode 100644 nomad/stream/ndjson_test.go diff --git a/command/agent/event_endpoint.go b/command/agent/event_endpoint.go new file mode 100644 index 00000000000..5684d9c14e8 --- /dev/null +++ b/command/agent/event_endpoint.go @@ -0,0 +1,169 @@ +package agent + +import ( + "bytes" + "context" + "fmt" + "io" + "net" + "net/http" + "net/url" + "strconv" + "strings" + + "github.com/docker/docker/pkg/ioutils" + "github.com/hashicorp/go-msgpack/codec" + "github.com/hashicorp/nomad/nomad/stream" + "github.com/hashicorp/nomad/nomad/structs" +) + +func (s *HTTPServer) EventStream(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + query := req.URL.Query() + + indexStr := query.Get("index") + if indexStr == "" { + indexStr = "0" + } + index, err := strconv.Atoi(indexStr) + if err != nil { + return nil, CodedError(400, fmt.Sprintf("Unable to parse index: %v", err)) + } + + topics, err := parseEventTopics(query) + if err != nil { + return nil, CodedError(400, fmt.Sprintf("Invalid topic query: %v", err)) + } + + args := &structs.EventStreamRequest{ + Topics: topics, + Index: index, + } + resp.Header().Set("Content-Type", "application/json") + resp.Header().Set("Cache-Control", "no-cache") + + s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions) + + // Make the RPC + var handler structs.StreamingRpcHandler + var handlerErr error + + if server := s.agent.Server(); server != nil { + handler, handlerErr = server.StreamingRpcHandler("Event.Stream") + } else if client := s.agent.Client(); client != nil { + handler, handlerErr = client.RemoteStreamingRpcHandler("Event.Stream") + } else { + handlerErr = fmt.Errorf("misconfigured connection") + } + + if handlerErr != nil { + return nil, CodedError(500, handlerErr.Error()) + } + + httpPipe, handlerPipe := net.Pipe() + decoder := codec.NewDecoder(httpPipe, structs.MsgpackHandle) + encoder := codec.NewEncoder(httpPipe, structs.MsgpackHandle) + + // Create a goroutine that closes the pipe if the connection closes + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + go func() { + <-ctx.Done() + httpPipe.Close() + }() + + // Create an output that gets flushed on every write + output := ioutils.NewWriteFlusher(resp) + + // create an error channel to handle errors + errCh := make(chan HTTPCodedError, 2) + + go func() { + defer cancel() + + // Send the request + if err := encoder.Encode(args); err != nil { + errCh <- CodedError(500, err.Error()) + return + } + + for { + select { + case <-ctx.Done(): + errCh <- nil + return + default: + } + + // Decode the response + var res structs.EventStreamWrapper + if err := decoder.Decode(&res); err != nil { + if err == io.EOF || err == io.ErrClosedPipe { + return + } + errCh <- CodedError(500, err.Error()) + return + } + decoder.Reset(httpPipe) + + if err := res.Error; err != nil { + if err.Code != nil { + errCh <- CodedError(int(*err.Code), err.Error()) + return + } + } + + // Flush json entry to response + if _, err := io.Copy(output, bytes.NewReader(res.Event.Data)); err != nil { + errCh <- CodedError(500, err.Error()) + return + } + } + }() + + // invoke handler + handler(handlerPipe) + cancel() + codedErr := <-errCh + + if codedErr != nil && + (codedErr == io.EOF || + strings.Contains(codedErr.Error(), io.ErrClosedPipe.Error())) { + codedErr = nil + } + + return nil, codedErr +} + +func parseEventTopics(query url.Values) (map[stream.Topic][]string, error) { + raw, ok := query["topic"] + if !ok { + return allTopics(), nil + } + topics := make(map[stream.Topic][]string) + + for _, topic := range raw { + k, v, err := parseTopic(topic) + if err != nil { + return nil, fmt.Errorf("error parsing topics: %w", err) + } + + if topics[stream.Topic(k)] == nil { + topics[stream.Topic(k)] = []string{v} + } else { + topics[stream.Topic(k)] = append(topics[stream.Topic(k)], v) + } + } + return topics, nil +} + +func parseTopic(topic string) (string, string, error) { + parts := strings.Split(topic, ":") + if len(parts) != 2 { + return "", "", fmt.Errorf("Invalid key value pair for topic, topic: %s", topic) + } + return parts[0], parts[1], nil +} + +func allTopics() map[stream.Topic][]string { + return map[stream.Topic][]string{"*": {"*"}} +} diff --git a/command/agent/event_endpoint_test.go b/command/agent/event_endpoint_test.go new file mode 100644 index 00000000000..88987e3d9de --- /dev/null +++ b/command/agent/event_endpoint_test.go @@ -0,0 +1,148 @@ +package agent + +import ( + "context" + "fmt" + "net/http" + "net/http/httptest" + "net/url" + "strings" + "testing" + "time" + + "github.com/hashicorp/nomad/nomad/stream" + "github.com/hashicorp/nomad/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type testEvent struct { + ID string +} + +func TestEventStream(t *testing.T) { + t.Parallel() + + httpTest(t, nil, func(s *TestAgent) { + ctx, cancel := context.WithCancel(context.Background()) + req, err := http.NewRequestWithContext(ctx, "GET", "/v1/event/stream", nil) + require.Nil(t, err) + resp := httptest.NewRecorder() + + respErrCh := make(chan error) + go func() { + _, err = s.Server.EventStream(resp, req) + respErrCh <- err + assert.NoError(t, err) + }() + + pub, err := s.Agent.server.State().EventPublisher() + require.NoError(t, err) + pub.Publish(100, []stream.Event{{Payload: testEvent{ID: "123"}}}) + + testutil.WaitForResult(func() (bool, error) { + got := resp.Body.String() + want := `{"ID":"123"}` + if strings.Contains(got, want) { + return true, nil + } + + return false, fmt.Errorf("missing expected json, got: %v, want: %v", got, want) + }, func(err error) { + cancel() + require.Fail(t, err.Error()) + }) + + // wait for response to close to prevent race between subscription + // shutdown and server shutdown returning subscription closed by server err + // resp.Close() + cancel() + select { + case err := <-respErrCh: + require.Nil(t, err) + case <-time.After(1 * time.Second): + require.Fail(t, "waiting for request cancellation") + } + }) +} + +func TestEventStream_QueryParse(t *testing.T) { + t.Parallel() + + cases := []struct { + desc string + query string + want map[stream.Topic][]string + wantErr bool + }{ + { + desc: "all topics and keys specified", + query: "?topic=*:*", + want: map[stream.Topic][]string{ + "*": []string{"*"}, + }, + }, + { + desc: "all topics and keys inferred", + query: "", + want: map[stream.Topic][]string{ + "*": []string{"*"}, + }, + }, + { + desc: "invalid key value formatting", + query: "?topic=NodeDrain:*:*", + wantErr: true, + }, + { + desc: "invalid key value formatting no value", + query: "?topic=NodeDrain", + wantErr: true, + }, + { + desc: "single topic and key", + query: "?topic=NodeDrain:*", + want: map[stream.Topic][]string{ + "NodeDrain": []string{"*"}, + }, + }, + { + desc: "single topic multiple keys", + query: "?topic=NodeDrain:*&topic=NodeDrain:3caace09-f1f4-4d23-b37a-9ab5eb75069d", + want: map[stream.Topic][]string{ + "NodeDrain": []string{ + "*", + "3caace09-f1f4-4d23-b37a-9ab5eb75069d", + }, + }, + }, + { + desc: "multiple topics", + query: "?topic=NodeRegister:*&topic=NodeDrain:3caace09-f1f4-4d23-b37a-9ab5eb75069d", + want: map[stream.Topic][]string{ + "NodeDrain": []string{ + "3caace09-f1f4-4d23-b37a-9ab5eb75069d", + }, + "NodeRegister": []string{ + "*", + }, + }, + }, + } + + for _, tc := range cases { + t.Run(tc.desc, func(t *testing.T) { + raw := fmt.Sprintf("http://localhost:80/v1/events%s", tc.query) + req, err := url.Parse(raw) + require.NoError(t, err) + + got, err := parseEventTopics(req.Query()) + if tc.wantErr { + require.Error(t, err) + return + } + require.NoError(t, err) + require.Equal(t, tc.want, got) + }) + } +} diff --git a/command/agent/http.go b/command/agent/http.go index 87b51c0aca8..777ff5974e9 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -326,6 +326,8 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { s.mux.HandleFunc("/v1/operator/scheduler/configuration", s.wrap(s.OperatorSchedulerConfiguration)) + s.mux.HandleFunc("/v1/event/stream", s.wrap(s.EventStream)) + if uiEnabled { s.mux.Handle("/ui/", http.StripPrefix("/ui/", s.handleUI(http.FileServer(&UIAssetWrapper{FileSystem: assetFS()})))) } else { diff --git a/nomad/config.go b/nomad/config.go index 14e92adf33f..d96a53d00f0 100644 --- a/nomad/config.go +++ b/nomad/config.go @@ -78,6 +78,10 @@ type Config struct { // in the absence of ACLs EnableDebug bool + // EnableEventPublisher is used to enable or disable the state stores + // event publishing + EnableEventPublisher bool + // LogOutput is the location to write logs to. If this is not set, // logs will go to stderr. LogOutput io.Writer @@ -421,6 +425,7 @@ func DefaultConfig() *Config { ReplicationBackoff: 30 * time.Second, SentinelGCInterval: 30 * time.Second, LicenseConfig: &LicenseConfig{}, + EnableEventPublisher: true, AutopilotConfig: &structs.AutopilotConfig{ CleanupDeadServers: true, LastContactThreshold: 200 * time.Millisecond, diff --git a/nomad/event_endpoint.go b/nomad/event_endpoint.go new file mode 100644 index 00000000000..5661200d43f --- /dev/null +++ b/nomad/event_endpoint.go @@ -0,0 +1,213 @@ +package nomad + +import ( + "context" + "io" + "time" + + "github.com/hashicorp/go-msgpack/codec" + "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/nomad/stream" + "github.com/hashicorp/nomad/nomad/structs" +) + +type Event struct { + srv *Server +} + +func (e *Event) register() { + e.srv.streamingRpcs.Register("Event.Stream", e.stream) +} + +func (e *Event) stream(conn io.ReadWriteCloser) { + defer conn.Close() + + var args structs.EventStreamRequest + decoder := codec.NewDecoder(conn, structs.MsgpackHandle) + encoder := codec.NewEncoder(conn, structs.MsgpackHandle) + + if err := decoder.Decode(&args); err != nil { + handleJsonResultError(err, helper.Int64ToPtr(500), encoder) + return + } + + // forward to appropriate region + if args.Region != e.srv.config.Region { + err := e.forwardStreamingRPC(args.Region, "Event.Stream", args, conn) + if err != nil { + handleJsonResultError(err, helper.Int64ToPtr(500), encoder) + } + return + } + + // ACL check + // TODO(drew) ACL checks need to be per topic + // All Events Management + // System Events Management + // Node Events NamespaceCapabilityReadEvents + // Job/Alloc Events NamespaceCapabilityReadEvents + if aclObj, err := e.srv.ResolveToken(args.AuthToken); err != nil { + handleJsonResultError(err, nil, encoder) + return + } else if aclObj != nil && !aclObj.IsManagement() { + handleJsonResultError(structs.ErrPermissionDenied, helper.Int64ToPtr(403), encoder) + return + } + + // TODO(drew) handle streams without ACLS + reqToken := args.AuthToken + if reqToken == "" { + // generate a random request token + reqToken = uuid.Generate() + } + subReq := &stream.SubscribeRequest{ + Token: reqToken, + Topics: args.Topics, + Index: uint64(args.Index), + } + publisher, err := e.srv.State().EventPublisher() + if err != nil { + handleJsonResultError(err, helper.Int64ToPtr(500), encoder) + return + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // start subscription to publisher + subscription, err := publisher.Subscribe(subReq) + if err != nil { + handleJsonResultError(err, helper.Int64ToPtr(500), encoder) + return + } + defer subscription.Unsubscribe() + + ndJsonCh := make(chan *stream.NDJson) + errCh := make(chan error) + + jsonStream := stream.NewNDJsonStream(ndJsonCh, 30*time.Second) + jsonStream.Run(ctx) + + // goroutine to detect remote side closing + go func() { + if _, err := conn.Read(nil); err != nil { + // One end of the pipe explicitly closed, exit + cancel() + return + } + select { + case <-errCh: + case <-ctx.Done(): + return + } + }() + + go func() { + defer cancel() + LOOP: + for { + events, err := subscription.Next(ctx) + if err != nil { + select { + case errCh <- err: + case <-ctx.Done(): + } + break LOOP + } + + // Continue if there are no events + if events == nil { + continue + } + + // Send each event as its own frame + for _, e := range events { + if err := jsonStream.Send(e); err != nil { + select { + case errCh <- err: + case <-ctx.Done(): + } + break LOOP + } + } + } + }() + + var streamErr error +OUTER: + for { + select { + case streamErr = <-errCh: + break OUTER + case <-ctx.Done(): + break OUTER + case eventJSON, ok := <-ndJsonCh: + // check if ndjson may have been closed when an error occurred, + // check once more for an error. + if !ok { + select { + case streamErr = <-errCh: + // There was a pending error + default: + } + break OUTER + } + + var resp structs.EventStreamWrapper + resp.Event = eventJSON + + if err := encoder.Encode(resp); err != nil { + streamErr = err + break OUTER + } + encoder.Reset(conn) + } + + } + + if streamErr != nil { + handleJsonResultError(streamErr, helper.Int64ToPtr(500), encoder) + return + } + +} + +func (e *Event) forwardStreamingRPC(region string, method string, args interface{}, in io.ReadWriteCloser) error { + server, err := e.srv.findRegionServer(region) + if err != nil { + return err + } + + return e.forwardStreamingRPCToServer(server, method, args, in) +} + +func (e *Event) forwardStreamingRPCToServer(server *serverParts, method string, args interface{}, in io.ReadWriteCloser) error { + srvConn, err := e.srv.streamingRpc(server, method) + if err != nil { + return err + } + defer srvConn.Close() + + outEncoder := codec.NewEncoder(srvConn, structs.MsgpackHandle) + if err := outEncoder.Encode(args); err != nil { + return err + } + + structs.Bridge(in, srvConn) + return nil +} + +// handleJsonResultError is a helper for sending an error with a potential +// error code. The transmission of the error is ignored if the error has been +// generated by the closing of the underlying transport. +func handleJsonResultError(err error, code *int64, encoder *codec.Encoder) { + // Nothing to do as the conn is closed + if err == io.EOF { + return + } + + encoder.Encode(&structs.EventStreamWrapper{ + Error: structs.NewRpcError(err, code), + }) +} diff --git a/nomad/event_endpoint_test.go b/nomad/event_endpoint_test.go new file mode 100644 index 00000000000..9e425619261 --- /dev/null +++ b/nomad/event_endpoint_test.go @@ -0,0 +1,295 @@ +package nomad + +import ( + "encoding/json" + "fmt" + "io" + "net" + "strings" + "testing" + "time" + + "github.com/hashicorp/go-msgpack/codec" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/stream" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" + "github.com/mitchellh/mapstructure" + "github.com/stretchr/testify/require" +) + +func TestEventStream(t *testing.T) { + t.Parallel() + + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.EnableEventPublisher = true + }) + defer cleanupS1() + + // Create request for all topics and keys + req := structs.EventStreamRequest{ + Topics: map[stream.Topic][]string{"*": []string{"*"}}, + QueryOptions: structs.QueryOptions{ + Region: s1.Region(), + }, + } + + handler, err := s1.StreamingRpcHandler("Event.Stream") + require.Nil(t, err) + + p1, p2 := net.Pipe() + defer p1.Close() + defer p2.Close() + + errCh := make(chan error) + streamMsg := make(chan *structs.EventStreamWrapper) + + // invoke handler + go handler(p2) + + // send request + go func() { + decoder := codec.NewDecoder(p1, structs.MsgpackHandle) + for { + var msg structs.EventStreamWrapper + if err := decoder.Decode(&msg); err != nil { + if err == io.EOF || strings.Contains(err.Error(), "closed") { + return + } + errCh <- fmt.Errorf("error decoding: %w", err) + } + + streamMsg <- &msg + } + }() + + // retrieve publisher for server, send event + publisher, err := s1.State().EventPublisher() + require.NoError(t, err) + + node := mock.Node() + publisher.Publish(uint64(1), []stream.Event{{Topic: "test", Payload: node}}) + + encoder := codec.NewEncoder(p1, structs.MsgpackHandle) + require.Nil(t, encoder.Encode(req)) + + timeout := time.After(3 * time.Second) +OUTER: + for { + select { + case <-timeout: + t.Fatal("timeout waiting for event stream") + case err := <-errCh: + t.Fatal(err) + case msg := <-streamMsg: + if msg.Error != nil { + t.Fatalf("Got error: %v", msg.Error.Error()) + } + + // ignore heartbeat + if msg.Event == stream.NDJsonHeartbeat { + continue + } + + var event stream.Event + err = json.Unmarshal(msg.Event.Data, &event) + require.NoError(t, err) + + // decode fully to ensure we received expected out + var out structs.Node + cfg := &mapstructure.DecoderConfig{ + Metadata: nil, + Result: &out, + } + dec, err := mapstructure.NewDecoder(cfg) + dec.Decode(event.Payload) + require.NoError(t, err) + require.Equal(t, node.ID, out.ID) + break OUTER + } + } +} + +// TestEventStream_StreamErr asserts an error is returned when an event publisher +// closes its subscriptions +func TestEventStream_StreamErr(t *testing.T) { + t.Parallel() + + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.EnableEventPublisher = true + }) + defer cleanupS1() + + testutil.WaitForLeader(t, s1.RPC) + + req := structs.EventStreamRequest{ + Topics: map[stream.Topic][]string{"*": []string{"*"}}, + QueryOptions: structs.QueryOptions{ + Region: s1.Region(), + }, + } + + handler, err := s1.StreamingRpcHandler("Event.Stream") + require.Nil(t, err) + + p1, p2 := net.Pipe() + defer p1.Close() + defer p2.Close() + + errCh := make(chan error) + streamMsg := make(chan *structs.EventStreamWrapper) + + go handler(p2) + + go func() { + decoder := codec.NewDecoder(p1, structs.MsgpackHandle) + for { + var msg structs.EventStreamWrapper + if err := decoder.Decode(&msg); err != nil { + if err == io.EOF || strings.Contains(err.Error(), "closed") { + return + } + errCh <- fmt.Errorf("error decoding: %w", err) + } + + streamMsg <- &msg + } + }() + + publisher, err := s1.State().EventPublisher() + require.NoError(t, err) + + node := mock.Node() + publisher.Publish(uint64(1), []stream.Event{{Topic: "test", Payload: node}}) + + // send req + encoder := codec.NewEncoder(p1, structs.MsgpackHandle) + require.Nil(t, encoder.Encode(req)) + + // stop the publisher to force an error on subscription side + s1.State().StopEventPublisher() + + timeout := time.After(5 * time.Second) +OUTER: + for { + select { + case <-timeout: + t.Fatal("timeout waiting for event stream") + case err := <-errCh: + t.Fatal(err) + case msg := <-streamMsg: + if msg.Error == nil { + // race between error and receiving an event + // continue trying for error + continue + } + require.NotNil(t, msg.Error) + require.Contains(t, msg.Error.Error(), "subscription closed by server") + break OUTER + } + } +} + +// TestEventStream_RegionForward tests event streaming from one server +// to another in a different region +func TestEventStream_RegionForward(t *testing.T) { + t.Parallel() + + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.EnableEventPublisher = true + }) + defer cleanupS1() + + s2, cleanupS2 := TestServer(t, func(c *Config) { + c.EnableEventPublisher = true + c.Region = "foo" + }) + defer cleanupS2() + + TestJoin(t, s1, s2) + + // Create request targed for region foo + req := structs.EventStreamRequest{ + Topics: map[stream.Topic][]string{"*": []string{"*"}}, + QueryOptions: structs.QueryOptions{ + Region: "foo", + }, + } + + // Query s1 handler + handler, err := s1.StreamingRpcHandler("Event.Stream") + require.Nil(t, err) + + p1, p2 := net.Pipe() + defer p1.Close() + defer p2.Close() + + errCh := make(chan error) + streamMsg := make(chan *structs.EventStreamWrapper) + + go handler(p2) + + go func() { + decoder := codec.NewDecoder(p1, structs.MsgpackHandle) + for { + var msg structs.EventStreamWrapper + if err := decoder.Decode(&msg); err != nil { + if err == io.EOF || strings.Contains(err.Error(), "closed") { + return + } + errCh <- fmt.Errorf("error decoding: %w", err) + } + + streamMsg <- &msg + } + }() + + // publish with server 2 + publisher, err := s2.State().EventPublisher() + require.NoError(t, err) + + node := mock.Node() + publisher.Publish(uint64(1), []stream.Event{{Topic: "test", Payload: node}}) + + // send req + encoder := codec.NewEncoder(p1, structs.MsgpackHandle) + require.Nil(t, encoder.Encode(req)) + + timeout := time.After(3 * time.Second) +OUTER: + for { + select { + case <-timeout: + t.Fatal("timeout waiting for event stream") + case err := <-errCh: + t.Fatal(err) + case msg := <-streamMsg: + if msg.Error != nil { + t.Fatalf("Got error: %v", msg.Error.Error()) + } + + if msg.Event == stream.NDJsonHeartbeat { + continue + } + + var event stream.Event + err = json.Unmarshal(msg.Event.Data, &event) + require.NoError(t, err) + + var out structs.Node + cfg := &mapstructure.DecoderConfig{ + Metadata: nil, + Result: &out, + } + dec, err := mapstructure.NewDecoder(cfg) + dec.Decode(event.Payload) + require.NoError(t, err) + require.Equal(t, node.ID, out.ID) + break OUTER + } + } +} + +// TODO(drew) acl test +func TestEventStream_ACL(t *testing.T) { +} diff --git a/nomad/fsm.go b/nomad/fsm.go index 83053a15da9..e807a049f61 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -126,14 +126,17 @@ type FSMConfig struct { // Region is the region of the server embedding the FSM Region string + + EnableEventPublisher bool } // NewFSMPath is used to construct a new FSM with a blank state func NewFSM(config *FSMConfig) (*nomadFSM, error) { // Create a state store sconfig := &state.StateStoreConfig{ - Logger: config.Logger, - Region: config.Region, + Logger: config.Logger, + Region: config.Region, + EnablePublisher: config.EnableEventPublisher, } state, err := state.NewStateStore(sconfig) if err != nil { @@ -163,6 +166,7 @@ func NewFSM(config *FSMConfig) (*nomadFSM, error) { // Close is used to cleanup resources associated with the FSM func (n *nomadFSM) Close() error { + n.state.StopEventPublisher() return nil } diff --git a/nomad/server.go b/nomad/server.go index d0977c1e7b4..dea6e29c1e6 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -275,6 +275,7 @@ type endpoints struct { ACL *ACL Scaling *Scaling Enterprise *EnterpriseEndpoints + Event *Event // Client endpoints ClientStats *ClientStats @@ -1162,6 +1163,9 @@ func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) { s.staticEndpoints.Agent = &Agent{srv: s} s.staticEndpoints.Agent.register() + + s.staticEndpoints.Event = &Event{srv: s} + s.staticEndpoints.Event.register() } // Register the static handlers @@ -1207,11 +1211,12 @@ func (s *Server) setupRaft() error { // Create the FSM fsmConfig := &FSMConfig{ - EvalBroker: s.evalBroker, - Periodic: s.periodicDispatcher, - Blocked: s.blockedEvals, - Logger: s.logger, - Region: s.Region(), + EvalBroker: s.evalBroker, + Periodic: s.periodicDispatcher, + Blocked: s.blockedEvals, + Logger: s.logger, + Region: s.Region(), + EnableEventPublisher: s.config.EnableEventPublisher, } var err error s.fsm, err = NewFSM(fsmConfig) diff --git a/nomad/state/state_changes.go b/nomad/state/state_changes.go index f105f49b5f3..be80c53c6bb 100644 --- a/nomad/state/state_changes.go +++ b/nomad/state/state_changes.go @@ -32,14 +32,14 @@ type Changes struct { // changeTrackerDB is a thin wrapper around memdb.DB which enables TrackChanges on // all write transactions. When the transaction is committed the changes are -// sent to the eventPublisher which will create and emit change events. +// sent to the EventPublisher which will create and emit change events. type changeTrackerDB struct { db *memdb.MemDB - publisher eventPublisher + publisher *stream.EventPublisher processChanges func(ReadTxn, Changes) ([]stream.Event, error) } -func NewChangeTrackerDB(db *memdb.MemDB, publisher eventPublisher, changesFn changeProcessor) *changeTrackerDB { +func NewChangeTrackerDB(db *memdb.MemDB, publisher *stream.EventPublisher, changesFn changeProcessor) *changeTrackerDB { return &changeTrackerDB{ db: db, publisher: publisher, @@ -49,15 +49,7 @@ func NewChangeTrackerDB(db *memdb.MemDB, publisher eventPublisher, changesFn cha type changeProcessor func(ReadTxn, Changes) ([]stream.Event, error) -type eventPublisher interface { - Publish(index uint64, events []stream.Event) -} - -// noOpPublisher satisfies the eventPublisher interface and does nothing -type noOpPublisher struct{} - -func (n *noOpPublisher) Publish(index uint64, events []stream.Event) {} -func noOpProcessChanges(ReadTxn, Changes) ([]stream.Event, error) { return []stream.Event{}, nil } +func noOpProcessChanges(ReadTxn, Changes) ([]stream.Event, error) { return []stream.Event{}, nil } // ReadTxn returns a read-only transaction which behaves exactly the same as // memdb.Txn diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 7616951974a..bd9529d8ccd 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -93,10 +93,11 @@ func NewStateStore(config *StateStoreConfig) (*StateStore, error) { publisher := stream.NewEventPublisher(ctx, stream.EventPublisherCfg{ EventBufferTTL: 1 * time.Hour, EventBufferSize: 250, + Logger: config.Logger, }) s.db = NewChangeTrackerDB(db, publisher, processDBChanges) } else { - s.db = NewChangeTrackerDB(db, &noOpPublisher{}, processDBChanges) + s.db = NewChangeTrackerDB(db, nil, noOpProcessChanges) } // Initialize the state store with required enterprise objects @@ -107,6 +108,13 @@ func NewStateStore(config *StateStoreConfig) (*StateStore, error) { return s, nil } +func (s *StateStore) EventPublisher() (*stream.EventPublisher, error) { + if s.db.publisher == nil { + return nil, fmt.Errorf("EventPublisher not configured") + } + return s.db.publisher, nil +} + // Config returns the state store configuration. func (s *StateStore) Config() *StateStoreConfig { return s.config @@ -123,7 +131,8 @@ func (s *StateStore) Snapshot() (*StateSnapshot, error) { config: s.config, } - store.db = NewChangeTrackerDB(memDBSnap, &noOpPublisher{}, noOpProcessChanges) + // Create a new change tracker DB that does not publish or track changes + store.db = NewChangeTrackerDB(memDBSnap, nil, noOpProcessChanges) snap := &StateSnapshot{ StateStore: store, diff --git a/nomad/stream/event_buffer.go b/nomad/stream/event_buffer.go index 127ca5b9fda..145776225a2 100644 --- a/nomad/stream/event_buffer.go +++ b/nomad/stream/event_buffer.go @@ -256,7 +256,7 @@ func (i *bufferItem) Next(ctx context.Context, forceClose <-chan struct{}) (*buf // state change (chan nil) as that's not threadsafe but detecting close is. select { case <-ctx.Done(): - return nil, ctx.Err() + return nil, fmt.Errorf("waiting for next event: %w", ctx.Err()) case <-forceClose: return nil, fmt.Errorf("subscription closed") case <-i.link.ch: diff --git a/nomad/stream/event_publisher.go b/nomad/stream/event_publisher.go index 7a0d78fe24c..aedc4bc101b 100644 --- a/nomad/stream/event_publisher.go +++ b/nomad/stream/event_publisher.go @@ -15,6 +15,7 @@ const ( type EventPublisherCfg struct { EventBufferSize int64 EventBufferTTL time.Duration + Logger hclog.Logger } type EventPublisher struct { @@ -56,8 +57,13 @@ func NewEventPublisher(ctx context.Context, cfg EventPublisherCfg) *EventPublish cfg.EventBufferTTL = 1 * time.Hour } + if cfg.Logger == nil { + cfg.Logger = hclog.NewNullLogger() + } + buffer := newEventBuffer(cfg.EventBufferSize, cfg.EventBufferTTL) e := &EventPublisher{ + logger: cfg.Logger.Named("event_publisher"), eventBuf: buffer, publishCh: make(chan changeEvents), subscriptions: &subscriptions{ @@ -95,7 +101,12 @@ func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error) e.logger.Warn("requested index no longer in buffer", "requsted", int(req.Index), "closest", int(head.Index)) } - sub := newSubscription(req, head, func() {}) + // Empty head so that calling Next on sub + start := newBufferItem(req.Index, []Event{}) + start.link.next.Store(head) + close(start.link.ch) + + sub := newSubscription(req, start, e.subscriptions.unsubscribe(req)) e.subscriptions.add(req, sub) return sub, nil diff --git a/nomad/stream/ndjson.go b/nomad/stream/ndjson.go new file mode 100644 index 00000000000..cb5195e54ef --- /dev/null +++ b/nomad/stream/ndjson.go @@ -0,0 +1,114 @@ +package stream + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "sync" + "time" +) + +var ( + // NDJsonHeartbeat is the NDJson to send as a heartbeat + // Avoids creating many heartbeat instances + NDJsonHeartbeat = &NDJson{Data: []byte("{}\n")} +) + +// NDJsonStream is used to send new line delimited JSON and heartbeats +// to a destination (out channel) +type NDJsonStream struct { + out chan<- *NDJson + + // heartbeat is the interval to send heartbeat messages to keep a connection + // open. + heartbeat *time.Ticker + + publishCh chan NDJson + exitCh chan struct{} + + l sync.Mutex + running bool +} + +// NNDJson is a wrapper for a Newline Delimited JSON object +type NDJson struct { + Data []byte +} + +// NewNNewNDJsonStream creates a new NDJson stream that will output NDJson structs +// to the passed output channel +func NewNDJsonStream(out chan<- *NDJson, heartbeat time.Duration) *NDJsonStream { + return &NDJsonStream{ + out: out, + heartbeat: time.NewTicker(heartbeat), + exitCh: make(chan struct{}), + publishCh: make(chan NDJson), + } +} + +// Run starts a long lived goroutine that handles sending +// heartbeats and processed json objects to the streams out channel as well +func (n *NDJsonStream) Run(ctx context.Context) { + n.l.Lock() + if n.running { + return + } + n.running = true + n.l.Unlock() + + go n.run(ctx) +} + +func (n *NDJsonStream) run(ctx context.Context) { + defer func() { + n.l.Lock() + n.running = false + n.l.Unlock() + close(n.exitCh) + }() + + for { + select { + case <-ctx.Done(): + return + case msg := <-n.publishCh: + n.out <- msg.Copy() + case <-n.heartbeat.C: + // Send a heartbeat frame + select { + case n.out <- NDJsonHeartbeat: + case <-ctx.Done(): + return + } + } + } +} + +// Send encodes an object into Newline delimited json. An error is returned +// if json encoding fails or if the stream is no longer running. +func (n *NDJsonStream) Send(obj interface{}) error { + n.l.Lock() + defer n.l.Unlock() + + buf := bytes.NewBuffer(nil) + if err := json.NewEncoder(buf).Encode(obj); err != nil { + return fmt.Errorf("marshaling json for stream: %w", err) + } + + select { + case n.publishCh <- NDJson{Data: buf.Bytes()}: + case <-n.exitCh: + return fmt.Errorf("stream is no longer running") + } + + return nil +} + +func (j *NDJson) Copy() *NDJson { + n := new(NDJson) + *n = *j + n.Data = make([]byte, len(j.Data)) + copy(n.Data, j.Data) + return n +} diff --git a/nomad/stream/ndjson_test.go b/nomad/stream/ndjson_test.go new file mode 100644 index 00000000000..8e807938fb7 --- /dev/null +++ b/nomad/stream/ndjson_test.go @@ -0,0 +1,72 @@ +package stream + +import ( + "bytes" + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +type testObj struct { + Name string `json:"name"` +} + +func TestNDJson(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + out := make(chan *NDJson) + s := NewNDJsonStream(out, 1*time.Second) + s.Run(ctx) + + require.NoError(t, s.Send(testObj{Name: "test"})) + + out1 := <-out + + var expected bytes.Buffer + expected.Write([]byte(`{"name":"test"}`)) + expected.Write([]byte("\n")) + + require.Equal(t, expected.Bytes(), out1.Data) + select { + case _ = <-out: + t.Fatalf("Did not expect another message") + case <-time.After(100 * time.Millisecond): + } +} + +func TestNDJson_Send_After_Stop(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + out := make(chan *NDJson) + s := NewNDJsonStream(out, 1*time.Second) + s.Run(ctx) + + // stop the stream + cancel() + + time.Sleep(10 * time.Millisecond) + require.Error(t, s.Send(testObj{})) +} + +func TestNDJson_HeartBeat(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + out := make(chan *NDJson) + s := NewNDJsonStream(out, 10*time.Millisecond) + s.Run(ctx) + + heartbeat := <-out + + require.Equal(t, NDJsonHeartbeat, heartbeat) +} diff --git a/nomad/stream/subscription.go b/nomad/stream/subscription.go index 6846e005bd3..0fc64512c19 100644 --- a/nomad/stream/subscription.go +++ b/nomad/stream/subscription.go @@ -21,10 +21,6 @@ const ( // closed. The client should Unsubscribe, then re-Subscribe. var ErrSubscriptionClosed = errors.New("subscription closed by server, client should resubscribe") -// type Subscriber struct { -// logger hclog.Logger -// } - type Subscription struct { // state is accessed atomically 0 means open, 1 means closed with reload state uint32 @@ -104,8 +100,15 @@ func filter(req *SubscribeRequest, events []Event) []Event { var count int for _, e := range events { - if _, ok := req.Topics[e.Topic]; ok { - for _, k := range req.Topics[e.Topic] { + _, allTopics := req.Topics[AllKeys] + if _, ok := req.Topics[e.Topic]; ok || allTopics { + var keys []string + if allTopics { + keys = req.Topics[AllKeys] + } else { + keys = req.Topics[e.Topic] + } + for _, k := range keys { if e.Key == k || k == AllKeys { count++ } @@ -124,8 +127,15 @@ func filter(req *SubscribeRequest, events []Event) []Event { // Return filtered events result := make([]Event, 0, count) for _, e := range events { - if _, ok := req.Topics[e.Topic]; ok { - for _, k := range req.Topics[e.Topic] { + _, allTopics := req.Topics[AllKeys] + if _, ok := req.Topics[e.Topic]; ok || allTopics { + var keys []string + if allTopics { + keys = req.Topics[AllKeys] + } else { + keys = req.Topics[e.Topic] + } + for _, k := range keys { if e.Key == k || k == AllKeys { result = append(result, e) } diff --git a/nomad/stream/subscription_test.go b/nomad/stream/subscription_test.go index 9d1f5572015..8dd841bd2be 100644 --- a/nomad/stream/subscription_test.go +++ b/nomad/stream/subscription_test.go @@ -10,6 +10,22 @@ func TestSubscription(t *testing.T) { } +func TestFilter_AllTopics(t *testing.T) { + events := make([]Event, 0, 5) + events = append(events, Event{Topic: "Test", Key: "One"}, Event{Topic: "Test", Key: "Two"}) + + req := &SubscribeRequest{ + Topics: map[Topic][]string{ + "*": []string{"*"}, + }, + } + actual := filter(req, events) + require.Equal(t, events, actual) + + // ensure new array was not allocated + require.Equal(t, cap(actual), 5) +} + func TestFilter_AllKeys(t *testing.T) { events := make([]Event, 0, 5) events = append(events, Event{Topic: "Test", Key: "One"}, Event{Topic: "Test", Key: "Two"}) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 340f2e36c37..a416b3d4634 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -39,6 +39,7 @@ import ( "github.com/hashicorp/nomad/helper/constraints/semver" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/lib/kheap" + "github.com/hashicorp/nomad/nomad/stream" psstructs "github.com/hashicorp/nomad/plugins/shared/structs" ) @@ -10521,3 +10522,34 @@ type ACLTokenUpsertResponse struct { Tokens []*ACLToken WriteMeta } + +// EEventStreamRequest is used to stream events from a servers +// EventPublisher +type EventStreamRequest struct { + Topics map[stream.Topic][]string + Index int + + QueryOptions +} + +type EventStreamWrapper struct { + Error *RpcError + Event *stream.NDJson +} + +// RpcError is used for serializing errors with a potential error code +type RpcError struct { + Message string + Code *int64 +} + +func NewRpcError(err error, code *int64) *RpcError { + return &RpcError{ + Message: err.Error(), + Code: code, + } +} + +func (r *RpcError) Error() string { + return r.Message +} diff --git a/nomad/testing.go b/nomad/testing.go index f575139ccc2..edb2d1c8e49 100644 --- a/nomad/testing.go +++ b/nomad/testing.go @@ -47,6 +47,7 @@ func TestServer(t testing.T, cb func(*Config)) (*Server, func()) { config.Logger = testlog.HCLogger(t) config.Build = version.Version + "+unittest" config.DevMode = true + config.EnableEventPublisher = true config.BootstrapExpect = 1 nodeNum := atomic.AddUint32(&nodeNumber, 1) config.NodeName = fmt.Sprintf("nomad-%03d", nodeNum)