From 3c15f41411593bddc3d233a98f2393923d9e3ab9 Mon Sep 17 00:00:00 2001 From: Drew Bailey <2614075+drewbailey@users.noreply.github.com> Date: Thu, 8 Oct 2020 14:27:52 -0400 Subject: [PATCH] filter on additional filter keys, remove switch statement duplication properly wire up durable event count move newline responsibility moves newline creation from NDJson to the http handler, json stream only encodes and sends now ignore snapshot restore if broker is disabled enable dev mode to access event steam without acl use mapping instead of switch use pointers for config sizes, remove unused ttl, simplify closed conn logic --- api/event.go | 4 +- api/event_test.go | 60 +- command/agent/agent.go | 11 +- command/agent/agent_test.go | 3 +- command/agent/config.go | 28 +- command/agent/config_parse_test.go | 6 +- command/agent/config_test.go | 47 +- command/agent/event_endpoint.go | 45 +- command/agent/event_endpoint_test.go | 14 +- command/agent/testdata/basic.hcl | 4 +- command/agent/testdata/basic.json | 4 +- nomad/config.go | 12 +- nomad/event_endpoint.go | 99 +-- nomad/event_endpoint_test.go | 143 ++++- nomad/fsm.go | 48 +- nomad/fsm_test.go | 20 +- nomad/node_endpoint_test.go | 2 +- nomad/server.go | 15 +- nomad/state/deployment_events_test.go | 89 +-- nomad/state/events.go | 111 ++-- nomad/state/events_test.go | 567 ++++++++++++++++++ nomad/state/node_events.go | 4 +- nomad/state/node_events_test.go | 34 +- nomad/state/state_changes.go | 77 +-- nomad/state/state_store.go | 56 +- nomad/state/state_store_events_test.go | 4 +- nomad/state/state_store_test.go | 36 +- nomad/stream/event_buffer.go | 37 +- nomad/stream/event_buffer_test.go | 28 +- nomad/stream/event_publisher.go | 132 ++-- nomad/stream/event_publisher_test.go | 37 +- nomad/stream/ndjson.go | 93 ++- nomad/stream/ndjson_test.go | 37 +- nomad/stream/subscription.go | 36 +- nomad/stream/subscription_test.go | 23 +- nomad/structs/structs.go | 51 +- nomad/testing.go | 2 +- .../github.com/hashicorp/nomad/api/event.go | 10 +- 38 files changed, 1369 insertions(+), 660 deletions(-) create mode 100644 nomad/state/events_test.go diff --git a/api/event.go b/api/event.go index 2bf08fba579..f89e222848f 100644 --- a/api/event.go +++ b/api/event.go @@ -47,11 +47,11 @@ func (c *Client) EventStream() *EventStream { // Stream establishes a new subscription to Nomad's event stream and streams // results back to the returned channel. func (e *EventStream) Stream(ctx context.Context, topics map[Topic][]string, index uint64, q *QueryOptions) (<-chan *Events, error) { - r, err := e.client.newRequest("GET", "/v1/event/stream") if err != nil { return nil, err } + q = q.WithContext(ctx) r.setQueryOptions(q) // Build topic query params @@ -82,7 +82,7 @@ func (e *EventStream) Stream(ctx context.Context, topics map[Topic][]string, ind // select eventsCh events = Events{Err: err} } - if events.IsHeartbeat() { + if events.Err == nil && events.IsHeartbeat() { continue } diff --git a/api/event_test.go b/api/event_test.go index 9bd4ae3a4dc..80820e97a80 100644 --- a/api/event_test.go +++ b/api/event_test.go @@ -34,20 +34,15 @@ func TestEvent_Stream(t *testing.T) { streamCh, err := events.Stream(ctx, topics, 0, q) require.NoError(t, err) -OUTER: - for { - select { - case event := <-streamCh: - if event.Err != nil { - require.Fail(t, err.Error()) - } - require.Equal(t, len(event.Events), 1) - require.Equal(t, "Eval", string(event.Events[0].Topic)) - - break OUTER - case <-time.After(5 * time.Second): - require.Fail(t, "failed waiting for event stream event") + select { + case event := <-streamCh: + if event.Err != nil { + require.Fail(t, err.Error()) } + require.Equal(t, len(event.Events), 1) + require.Equal(t, "Eval", string(event.Events[0].Topic)) + case <-time.After(5 * time.Second): + require.Fail(t, "failed waiting for event stream event") } } @@ -76,4 +71,43 @@ func TestEvent_Stream_Err_InvalidQueryParam(t *testing.T) { _, err = events.Stream(ctx, topics, 0, q) require.Error(t, err) + require.Contains(t, err.Error(), "400") + require.Contains(t, err.Error(), "Invalid key value pair") +} + +func TestEvent_Stream_CloseCtx(t *testing.T) { + t.Parallel() + + c, s := makeClient(t, nil, nil) + defer s.Stop() + + // register job to generate events + jobs := c.Jobs() + job := testJob() + resp2, _, err := jobs.Register(job, nil) + require.Nil(t, err) + require.NotNil(t, resp2) + + // build event stream request + events := c.EventStream() + q := &QueryOptions{} + topics := map[Topic][]string{ + "Eval": {"*"}, + } + + ctx, cancel := context.WithCancel(context.Background()) + + streamCh, err := events.Stream(ctx, topics, 0, q) + require.NoError(t, err) + + // cancel the request + cancel() + + select { + case event, ok := <-streamCh: + require.False(t, ok) + require.Nil(t, event) + case <-time.After(5 * time.Second): + require.Fail(t, "failed waiting for event stream event") + } } diff --git a/command/agent/agent.go b/command/agent/agent.go index 54de24470f6..e9d9f882bd5 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -243,11 +243,14 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) { if agentConfig.Server.UpgradeVersion != "" { conf.UpgradeVersion = agentConfig.Server.UpgradeVersion } - if agentConfig.Server.EnableEventPublisher != nil { - conf.EnableEventPublisher = *agentConfig.Server.EnableEventPublisher + if agentConfig.Server.EnableEventBroker != nil { + conf.EnableEventBroker = *agentConfig.Server.EnableEventBroker } - if agentConfig.Server.EventBufferSize > 0 { - conf.EventBufferSize = int64(agentConfig.Server.EventBufferSize) + if agentConfig.Server.EventBufferSize != nil { + conf.EventBufferSize = int64(*agentConfig.Server.EventBufferSize) + } + if agentConfig.Server.DurableEventCount != nil { + conf.DurableEventCount = int64(*agentConfig.Server.DurableEventCount) } if agentConfig.Autopilot != nil { if agentConfig.Autopilot.CleanupDeadServers != nil { diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index 1ac90044465..5c71a70b3c6 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -57,7 +57,8 @@ func TestAgent_ServerConfig(t *testing.T) { out, err := a.serverConfig() require.NoError(t, err) - require.True(t, out.EnableEventPublisher) + require.True(t, out.EnableEventBroker) + require.Equal(t, int64(100), out.DurableEventCount) serfAddr := out.SerfConfig.MemberlistConfig.AdvertiseAddr require.Equal(t, "127.0.0.1", serfAddr) diff --git a/command/agent/config.go b/command/agent/config.go index 9665fcaa9ad..f9690efffab 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -484,18 +484,18 @@ type ServerConfig struct { // This value is ignored. DefaultSchedulerConfig *structs.SchedulerConfiguration `hcl:"default_scheduler_config"` - // EnableEventPublisher configures whether this server's state store + // EnableEventBroker configures whether this server's state store // will generate events for its event stream. - EnableEventPublisher *bool `hcl:"enable_event_publisher"` + EnableEventBroker *bool `hcl:"enable_event_broker"` // EventBufferSize configure the amount of events to be held in memory. - // If EnableEventPublisher is set to true, the minimum allowable value + // If EnableEventBroker is set to true, the minimum allowable value // for the EventBufferSize is 1. - EventBufferSize int `hcl:"event_buffer_size"` + EventBufferSize *int `hcl:"event_buffer_size"` // DurableEventCount specifies the amount of events to persist during snapshot generation. // A count of 0 signals that no events should be persisted. - DurableEventCount int `hcl:"durable_event_count"` + DurableEventCount *int `hcl:"durable_event_count"` // ExtraKeysHCL is used by hcl to surface unexpected keys ExtraKeysHCL []string `hcl:",unusedKeys" json:"-"` @@ -887,11 +887,11 @@ func DefaultConfig() *Config { BindWildcardDefaultHostNetwork: true, }, Server: &ServerConfig{ - Enabled: false, - EnableEventPublisher: helper.BoolToPtr(true), - EventBufferSize: 100, - DurableEventCount: 100, - StartJoin: []string{}, + Enabled: false, + EnableEventBroker: helper.BoolToPtr(true), + EventBufferSize: helper.IntToPtr(100), + DurableEventCount: helper.IntToPtr(100), + StartJoin: []string{}, ServerJoin: &ServerJoin{ RetryJoin: []string{}, RetryInterval: 30 * time.Second, @@ -1415,15 +1415,15 @@ func (a *ServerConfig) Merge(b *ServerConfig) *ServerConfig { result.ServerJoin = result.ServerJoin.Merge(b.ServerJoin) } - if b.EnableEventPublisher != nil { - result.EnableEventPublisher = b.EnableEventPublisher + if b.EnableEventBroker != nil { + result.EnableEventBroker = b.EnableEventBroker } - if b.EventBufferSize != 0 { + if b.EventBufferSize != nil { result.EventBufferSize = b.EventBufferSize } - if b.DurableEventCount != 0 { + if b.DurableEventCount != nil { result.DurableEventCount = b.DurableEventCount } diff --git a/command/agent/config_parse_test.go b/command/agent/config_parse_test.go index 8999fa28120..771ff19731f 100644 --- a/command/agent/config_parse_test.go +++ b/command/agent/config_parse_test.go @@ -122,9 +122,9 @@ var basicConfig = &Config{ RedundancyZone: "foo", UpgradeVersion: "0.8.0", EncryptKey: "abc", - EnableEventPublisher: helper.BoolToPtr(false), - EventBufferSize: 200, - DurableEventCount: 100, + EnableEventBroker: helper.BoolToPtr(false), + EventBufferSize: helper.IntToPtr(200), + DurableEventCount: helper.IntToPtr(0), ServerJoin: &ServerJoin{ RetryJoin: []string{"1.1.1.1", "2.2.2.2"}, RetryInterval: time.Duration(15) * time.Second, diff --git a/command/agent/config_test.go b/command/agent/config_test.go index 59aa42e0572..4e0aa934609 100644 --- a/command/agent/config_test.go +++ b/command/agent/config_test.go @@ -138,7 +138,9 @@ func TestConfig_Merge(t *testing.T) { MaxHeartbeatsPerSecond: 30.0, RedundancyZone: "foo", UpgradeVersion: "foo", - EnableEventPublisher: helper.BoolToPtr(false), + EnableEventBroker: helper.BoolToPtr(false), + EventBufferSize: helper.IntToPtr(0), + DurableEventCount: helper.IntToPtr(0), }, ACL: &ACLConfig{ Enabled: true, @@ -329,7 +331,9 @@ func TestConfig_Merge(t *testing.T) { NonVotingServer: true, RedundancyZone: "bar", UpgradeVersion: "bar", - EnableEventPublisher: helper.BoolToPtr(true), + EnableEventBroker: helper.BoolToPtr(true), + DurableEventCount: helper.IntToPtr(100), + EventBufferSize: helper.IntToPtr(100), }, ACL: &ACLConfig{ Enabled: true, @@ -1166,40 +1170,57 @@ func TestTelemetry_Parse(t *testing.T) { require.True(config.Telemetry.DisableDispatchedJobSummaryMetrics) } -func TestEventPublisher_Parse(t *testing.T) { +func TestEventBroker_Parse(t *testing.T) { require := require.New(t) - { a := &ServerConfig{ - EnableEventPublisher: helper.BoolToPtr(false), + EnableEventBroker: helper.BoolToPtr(false), + EventBufferSize: helper.IntToPtr(0), + DurableEventCount: helper.IntToPtr(0), } b := DefaultConfig().Server - b.EnableEventPublisher = nil + b.EnableEventBroker = nil + b.EventBufferSize = nil + b.DurableEventCount = nil result := a.Merge(b) - require.Equal(false, *result.EnableEventPublisher) + require.Equal(false, *result.EnableEventBroker) + require.Equal(0, *result.EventBufferSize) + require.Equal(0, *result.DurableEventCount) } { a := &ServerConfig{ - EnableEventPublisher: helper.BoolToPtr(true), + EnableEventBroker: helper.BoolToPtr(true), + EventBufferSize: helper.IntToPtr(5000), + DurableEventCount: helper.IntToPtr(200), } b := DefaultConfig().Server - b.EnableEventPublisher = nil + b.EnableEventBroker = nil + b.EventBufferSize = nil + b.DurableEventCount = nil result := a.Merge(b) - require.Equal(true, *result.EnableEventPublisher) + require.Equal(true, *result.EnableEventBroker) + require.Equal(5000, *result.EventBufferSize) + require.Equal(200, *result.DurableEventCount) } { a := &ServerConfig{ - EnableEventPublisher: helper.BoolToPtr(false), + EnableEventBroker: helper.BoolToPtr(false), + EventBufferSize: helper.IntToPtr(0), + DurableEventCount: helper.IntToPtr(0), } b := DefaultConfig().Server - b.EnableEventPublisher = helper.BoolToPtr(true) + b.EnableEventBroker = helper.BoolToPtr(true) + b.EventBufferSize = helper.IntToPtr(20000) + b.DurableEventCount = helper.IntToPtr(1000) result := a.Merge(b) - require.Equal(true, *result.EnableEventPublisher) + require.Equal(true, *result.EnableEventBroker) + require.Equal(20000, *result.EventBufferSize) + require.Equal(1000, *result.DurableEventCount) } } diff --git a/command/agent/event_endpoint.go b/command/agent/event_endpoint.go index af46620acf5..d28ca84e772 100644 --- a/command/agent/event_endpoint.go +++ b/command/agent/event_endpoint.go @@ -14,6 +14,7 @@ import ( "github.com/docker/docker/pkg/ioutils" "github.com/hashicorp/go-msgpack/codec" "github.com/hashicorp/nomad/nomad/structs" + "golang.org/x/sync/errgroup" ) func (s *HTTPServer) EventStream(resp http.ResponseWriter, req *http.Request) (interface{}, error) { @@ -40,12 +41,12 @@ func (s *HTTPServer) EventStream(resp http.ResponseWriter, req *http.Request) (i resp.Header().Set("Content-Type", "application/json") resp.Header().Set("Cache-Control", "no-cache") + // Set region, namespace and authtoken to args s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions) - // Make the RPC + // Determine the RPC handler to use to find a server var handler structs.StreamingRpcHandler var handlerErr error - if server := s.agent.Server(); server != nil { handler, handlerErr = server.StreamingRpcHandler("Event.Stream") } else if client := s.agent.Client(); client != nil { @@ -73,57 +74,51 @@ func (s *HTTPServer) EventStream(resp http.ResponseWriter, req *http.Request) (i // Create an output that gets flushed on every write output := ioutils.NewWriteFlusher(resp) - // create an error channel to handle errors - errCh := make(chan HTTPCodedError, 1) - - go func() { + // send request and decode events + errs, errCtx := errgroup.WithContext(ctx) + errs.Go(func() error { defer cancel() // Send the request if err := encoder.Encode(args); err != nil { - errCh <- CodedError(500, err.Error()) - return + return CodedError(500, err.Error()) } for { select { - case <-ctx.Done(): - errCh <- nil - return + case <-errCtx.Done(): + return nil default: } // Decode the response var res structs.EventStreamWrapper if err := decoder.Decode(&res); err != nil { - if err == io.EOF || err == io.ErrClosedPipe { - return - } - errCh <- CodedError(500, err.Error()) - return + return CodedError(500, err.Error()) } decoder.Reset(httpPipe) if err := res.Error; err != nil { if err.Code != nil { - errCh <- CodedError(int(*err.Code), err.Error()) - return + return CodedError(int(*err.Code), err.Error()) } } // Flush json entry to response if _, err := io.Copy(output, bytes.NewReader(res.Event.Data)); err != nil { - errCh <- CodedError(500, err.Error()) - return + return CodedError(500, err.Error()) } + // Each entry is its own new line according to ndjson.org + // append new line to each entry + fmt.Fprint(output, "\n") } - }() + }) // invoke handler handler(handlerPipe) cancel() - codedErr := <-errCh + codedErr := errs.Wait() if codedErr != nil && strings.Contains(codedErr.Error(), io.ErrClosedPipe.Error()) { codedErr = nil } @@ -144,11 +139,7 @@ func parseEventTopics(query url.Values) (map[structs.Topic][]string, error) { return nil, fmt.Errorf("error parsing topics: %w", err) } - if topics[structs.Topic(k)] == nil { - topics[structs.Topic(k)] = []string{v} - } else { - topics[structs.Topic(k)] = append(topics[structs.Topic(k)], v) - } + topics[structs.Topic(k)] = append(topics[structs.Topic(k)], v) } return topics, nil } diff --git a/command/agent/event_endpoint_test.go b/command/agent/event_endpoint_test.go index 3b95488d193..c450a917988 100644 --- a/command/agent/event_endpoint_test.go +++ b/command/agent/event_endpoint_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" @@ -37,7 +38,7 @@ func TestEventStream(t *testing.T) { assert.NoError(t, err) }() - pub, err := s.Agent.server.State().EventPublisher() + pub, err := s.Agent.server.State().EventBroker() require.NoError(t, err) pub.Publish(&structs.Events{Index: 100, Events: []structs.Event{{Payload: testEvent{ID: "123"}}}}) @@ -71,6 +72,8 @@ func TestEventStream_NamespaceQuery(t *testing.T) { httpTest(t, nil, func(s *TestAgent) { ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + req, err := http.NewRequestWithContext(ctx, "GET", "/v1/event/stream?namespace=foo", nil) require.Nil(t, err) resp := httptest.NewRecorder() @@ -82,17 +85,17 @@ func TestEventStream_NamespaceQuery(t *testing.T) { assert.NoError(t, err) }() - pub, err := s.Agent.server.State().EventPublisher() + pub, err := s.Agent.server.State().EventBroker() require.NoError(t, err) - pub.Publish(&structs.Events{Index: 100, Events: []structs.Event{{Namespace: "bar", Payload: testEvent{ID: "123"}}}}) + badID := uuid.Generate() + pub.Publish(&structs.Events{Index: 100, Events: []structs.Event{{Namespace: "bar", Payload: testEvent{ID: badID}}}}) pub.Publish(&structs.Events{Index: 101, Events: []structs.Event{{Namespace: "foo", Payload: testEvent{ID: "456"}}}}) testutil.WaitForResult(func() (bool, error) { got := resp.Body.String() want := `"Namespace":"foo"` - bad := `123` - if strings.Contains(got, bad) { + if strings.Contains(got, badID) { return false, fmt.Errorf("expected non matching namespace to be filtered, got:%v", got) } if strings.Contains(got, want) { @@ -101,7 +104,6 @@ func TestEventStream_NamespaceQuery(t *testing.T) { return false, fmt.Errorf("missing expected json, got: %v, want: %v", got, want) }, func(err error) { - cancel() require.Fail(t, err.Error()) }) diff --git a/command/agent/testdata/basic.hcl b/command/agent/testdata/basic.hcl index be35093e980..66d9f9d0f10 100644 --- a/command/agent/testdata/basic.hcl +++ b/command/agent/testdata/basic.hcl @@ -130,9 +130,9 @@ server { upgrade_version = "0.8.0" encrypt = "abc" raft_multiplier = 4 - enable_event_publisher = false + enable_event_broker = false event_buffer_size = 200 - durable_event_count = 100 + durable_event_count = 0 server_join { retry_join = ["1.1.1.1", "2.2.2.2"] diff --git a/command/agent/testdata/basic.json b/command/agent/testdata/basic.json index d9db2d5d0f1..c20889fc635 100644 --- a/command/agent/testdata/basic.json +++ b/command/agent/testdata/basic.json @@ -261,9 +261,9 @@ "data_dir": "/tmp/data", "deployment_gc_threshold": "12h", "enabled": true, - "enable_event_publisher": false, + "enable_event_broker": false, "event_buffer_size": 200, - "durable_event_count": 100, + "durable_event_count": 0, "enabled_schedulers": [ "test" ], diff --git a/nomad/config.go b/nomad/config.go index 0a2272be884..93abdc6f132 100644 --- a/nomad/config.go +++ b/nomad/config.go @@ -78,13 +78,17 @@ type Config struct { // in the absence of ACLs EnableDebug bool - // EnableEventPublisher is used to enable or disable state store + // EnableEventBroker is used to enable or disable state store // event publishing - EnableEventPublisher bool + EnableEventBroker bool // EventBufferSize is the amount of events to hold in memory. EventBufferSize int64 + // DurableEventCount is the amount of events to save to disk when + // snapshotting + DurableEventCount int64 + // LogOutput is the location to write logs to. If this is not set, // logs will go to stderr. LogOutput io.Writer @@ -420,7 +424,9 @@ func DefaultConfig() *Config { ReplicationBackoff: 30 * time.Second, SentinelGCInterval: 30 * time.Second, LicenseConfig: &LicenseConfig{}, - EnableEventPublisher: true, + EnableEventBroker: true, + EventBufferSize: 100, + DurableEventCount: 100, AutopilotConfig: &structs.AutopilotConfig{ CleanupDeadServers: true, LastContactThreshold: 200 * time.Millisecond, diff --git a/nomad/event_endpoint.go b/nomad/event_endpoint.go index 3ee249cab2d..3b980981496 100644 --- a/nomad/event_endpoint.go +++ b/nomad/event_endpoint.go @@ -2,12 +2,14 @@ package nomad import ( "context" + "fmt" "io" + "io/ioutil" "time" "github.com/hashicorp/go-msgpack/codec" + "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/helper" - "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/stream" "github.com/hashicorp/nomad/nomad/structs" ) @@ -41,35 +43,29 @@ func (e *Event) stream(conn io.ReadWriteCloser) { return } - // ACL check - // TODO(drew) ACL checks need to be per topic - // All Events Management - // System Events Management - // Node Events NamespaceCapabilityReadEvents - // Job/Alloc Events NamespaceCapabilityReadEvents - if aclObj, err := e.srv.ResolveToken(args.AuthToken); err != nil { + aclObj, err := e.srv.ResolveToken(args.AuthToken) + if err != nil { handleJsonResultError(err, nil, encoder) return - } else if aclObj != nil && !aclObj.IsManagement() { - handleJsonResultError(structs.ErrPermissionDenied, helper.Int64ToPtr(403), encoder) - return } - // authToken is passed to the subscribe request so the event stream - // can handle closing a subscription if the authToken expires. - // If ACLs are disabled, a random token is generated and it will - // never be closed due to expiry. - authToken := args.AuthToken - if authToken == "" { - authToken = uuid.Generate() - } subReq := &stream.SubscribeRequest{ - Token: authToken, + Token: args.AuthToken, Topics: args.Topics, Index: uint64(args.Index), Namespace: args.Namespace, } - publisher, err := e.srv.State().EventPublisher() + + // Check required ACL permissions for requested Topics + if aclObj != nil { + if err := aclCheckForEvents(subReq, aclObj); err != nil { + handleJsonResultError(structs.ErrPermissionDenied, helper.Int64ToPtr(403), encoder) + return + } + } + + // Get the servers broker and subscribe + publisher, err := e.srv.State().EventBroker() if err != nil { handleJsonResultError(err, helper.Int64ToPtr(500), encoder) return @@ -86,23 +82,14 @@ func (e *Event) stream(conn io.ReadWriteCloser) { } defer subscription.Unsubscribe() - ndJsonCh := make(chan *structs.NDJson) errCh := make(chan error) - jsonStream := stream.NewNDJsonStream(ndJsonCh, 30*time.Second) - jsonStream.Run(ctx) + jsonStream := stream.NewJsonStream(ctx, 30*time.Second) // goroutine to detect remote side closing go func() { - if _, err := conn.Read(nil); err != nil { - // One end of the pipe explicitly closed, exit - cancel() - return - } - select { - case <-ctx.Done(): - return - } + io.Copy(ioutil.Discard, conn) + cancel() }() go func() { @@ -140,7 +127,7 @@ OUTER: break OUTER case <-ctx.Done(): break OUTER - case eventJSON, ok := <-ndJsonCh: + case eventJSON, ok := <-jsonStream.OutCh(): // check if ndjson may have been closed when an error occurred, // check once more for an error. if !ok { @@ -209,3 +196,47 @@ func handleJsonResultError(err error, code *int64, encoder *codec.Encoder) { Error: structs.NewRpcError(err, code), }) } + +func aclCheckForEvents(subReq *stream.SubscribeRequest, aclObj *acl.ACL) error { + if len(subReq.Topics) == 0 { + return fmt.Errorf("invalid topic request") + } + + reqPolicies := make(map[string]struct{}) + var required = struct{}{} + + for topic := range subReq.Topics { + switch topic { + case structs.TopicDeployment, structs.TopicEval, + structs.TopicAlloc, structs.TopicJob: + if _, ok := reqPolicies[acl.NamespaceCapabilityReadJob]; !ok { + reqPolicies[acl.NamespaceCapabilityReadJob] = required + } + case structs.TopicNode: + reqPolicies["node-read"] = required + case structs.TopicAll: + reqPolicies["management"] = required + default: + return fmt.Errorf("unknown topic %s", topic) + } + } + + for checks := range reqPolicies { + switch checks { + case acl.NamespaceCapabilityReadJob: + if ok := aclObj.AllowNsOp(subReq.Namespace, acl.NamespaceCapabilityReadJob); !ok { + return structs.ErrPermissionDenied + } + case "node-read": + if ok := aclObj.AllowNodeRead(); !ok { + return structs.ErrPermissionDenied + } + case "management": + if ok := aclObj.IsManagement(); !ok { + return structs.ErrPermissionDenied + } + } + } + + return nil +} diff --git a/nomad/event_endpoint_test.go b/nomad/event_endpoint_test.go index 573084ff313..26757d37ca3 100644 --- a/nomad/event_endpoint_test.go +++ b/nomad/event_endpoint_test.go @@ -23,7 +23,7 @@ func TestEventStream(t *testing.T) { t.Parallel() s1, cleanupS1 := TestServer(t, func(c *Config) { - c.EnableEventPublisher = true + c.EnableEventBroker = true }) defer cleanupS1() @@ -65,7 +65,7 @@ func TestEventStream(t *testing.T) { }() // retrieve publisher for server, send event - publisher, err := s1.State().EventPublisher() + publisher, err := s1.State().EventBroker() require.NoError(t, err) node := mock.Node() @@ -75,7 +75,12 @@ func TestEventStream(t *testing.T) { encoder := codec.NewEncoder(p1, structs.MsgpackHandle) require.Nil(t, encoder.Encode(req)) + publisher.Publish(&structs.Events{Index: uint64(2), Events: []structs.Event{{Topic: "test", Payload: node}}}) + publisher.Publish(&structs.Events{Index: uint64(3), Events: []structs.Event{{Topic: "test", Payload: node}}}) + timeout := time.After(3 * time.Second) + got := 0 + want := 3 OUTER: for { select { @@ -89,7 +94,7 @@ OUTER: } // ignore heartbeat - if msg.Event == stream.NDJsonHeartbeat { + if msg.Event == stream.JsonHeartbeat { continue } @@ -107,7 +112,11 @@ OUTER: dec.Decode(event.Events[0].Payload) require.NoError(t, err) require.Equal(t, node.ID, out.ID) - break OUTER + + got++ + if got == want { + break OUTER + } } } } @@ -118,7 +127,7 @@ func TestEventStream_StreamErr(t *testing.T) { t.Parallel() s1, cleanupS1 := TestServer(t, func(c *Config) { - c.EnableEventPublisher = true + c.EnableEventBroker = true }) defer cleanupS1() @@ -158,7 +167,7 @@ func TestEventStream_StreamErr(t *testing.T) { } }() - publisher, err := s1.State().EventPublisher() + publisher, err := s1.State().EventBroker() require.NoError(t, err) node := mock.Node() @@ -200,12 +209,12 @@ func TestEventStream_RegionForward(t *testing.T) { t.Parallel() s1, cleanupS1 := TestServer(t, func(c *Config) { - c.EnableEventPublisher = true + c.EnableEventBroker = true }) defer cleanupS1() s2, cleanupS2 := TestServer(t, func(c *Config) { - c.EnableEventPublisher = true + c.EnableEventBroker = true c.Region = "foo" }) defer cleanupS2() @@ -249,7 +258,7 @@ func TestEventStream_RegionForward(t *testing.T) { }() // publish with server 2 - publisher, err := s2.State().EventPublisher() + publisher, err := s2.State().EventBroker() require.NoError(t, err) node := mock.Node() @@ -272,7 +281,7 @@ OUTER: t.Fatalf("Got error: %v", msg.Error.Error()) } - if msg.Event == stream.NDJsonHeartbeat { + if msg.Event == stream.JsonHeartbeat { continue } @@ -306,35 +315,124 @@ func TestEventStream_ACL(t *testing.T) { policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityReadFS}) tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad) + policyNsGood := mock.NamespacePolicy("foo", "", []string{acl.NamespaceCapabilityReadJob}) + tokenNsFoo := mock.CreatePolicyAndToken(t, s.State(), 1006, "valid", policyNsGood) + + policyNsNode := mock.NamespacePolicy("foo", "", []string{acl.NamespaceCapabilityReadJob}) + policyNsNode += "\n" + mock.NodePolicy("read") + tokenNsNode := mock.CreatePolicyAndToken(t, s.State(), 1007, "validnNsNode", policyNsNode) + cases := []struct { Name string Token string + Topics map[structs.Topic][]string + Namespace string ExpectedErr string + PublishFn func(p *stream.EventBroker) }{ { - Name: "no token", - Token: "", + Name: "no token", + Token: "", + Topics: map[structs.Topic][]string{ + "*": {"*"}, + }, + ExpectedErr: structs.ErrPermissionDenied.Error(), + }, + { + Name: "bad token", + Token: tokenBad.SecretID, + Topics: map[structs.Topic][]string{ + "*": {"*"}, + }, + ExpectedErr: structs.ErrPermissionDenied.Error(), + }, + { + Name: "root token", + Token: root.SecretID, + Topics: map[structs.Topic][]string{ + "*": {"*"}, + }, + ExpectedErr: "subscription closed by server", + }, + { + Name: "job namespace token - correct ns", + Token: tokenNsFoo.SecretID, + Topics: map[structs.Topic][]string{ + "Job": {"*"}, + "Eval": {"*"}, + "Alloc": {"*"}, + "Deployment": {"*"}, + }, + Namespace: "foo", + ExpectedErr: "subscription closed by server", + PublishFn: func(p *stream.EventBroker) { + p.Publish(&structs.Events{Index: uint64(1000), Events: []structs.Event{{Topic: "Job", Namespace: "foo", Payload: mock.Job()}}}) + }, + }, + { + Name: "job namespace token - incorrect ns", + Token: tokenNsFoo.SecretID, + Topics: map[structs.Topic][]string{ + "Job": {"*"}, // good + }, + Namespace: "bar", // bad ExpectedErr: structs.ErrPermissionDenied.Error(), + PublishFn: func(p *stream.EventBroker) { + p.Publish(&structs.Events{Index: uint64(1000), Events: []structs.Event{{Topic: "Job", Namespace: "foo", Payload: mock.Job()}}}) + }, }, { - Name: "bad token", - Token: tokenBad.SecretID, + Name: "job namespace token - request management topic", + Token: tokenNsFoo.SecretID, + Topics: map[structs.Topic][]string{ + "*": {"*"}, // bad + }, + Namespace: "foo", ExpectedErr: structs.ErrPermissionDenied.Error(), + PublishFn: func(p *stream.EventBroker) { + p.Publish(&structs.Events{Index: uint64(1000), Events: []structs.Event{{Topic: "Job", Namespace: "foo", Payload: mock.Job()}}}) + }, }, { - Name: "root token", - Token: root.SecretID, + Name: "job namespace token - request invalid node topic", + Token: tokenNsFoo.SecretID, + Topics: map[structs.Topic][]string{ + "Eval": {"*"}, // good + "Node": {"*"}, // bad + }, + Namespace: "foo", + ExpectedErr: structs.ErrPermissionDenied.Error(), + PublishFn: func(p *stream.EventBroker) { + p.Publish(&structs.Events{Index: uint64(1000), Events: []structs.Event{{Topic: "Job", Namespace: "foo", Payload: mock.Job()}}}) + }, + }, + { + Name: "job+node namespace token, valid", + Token: tokenNsNode.SecretID, + Topics: map[structs.Topic][]string{ + "Eval": {"*"}, // good + "Node": {"*"}, // good + }, + Namespace: "foo", ExpectedErr: "subscription closed by server", + PublishFn: func(p *stream.EventBroker) { + p.Publish(&structs.Events{Index: uint64(1000), Events: []structs.Event{{Topic: "Node", Payload: mock.Node()}}}) + }, }, } for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { + var ns string + if tc.Namespace != "" { + ns = tc.Namespace + } // Create request for all topics and keys req := structs.EventStreamRequest{ - Topics: map[structs.Topic][]string{"*": {"*"}}, + Topics: tc.Topics, QueryOptions: structs.QueryOptions{ Region: s.Region(), + Namespace: ns, AuthToken: tc.Token, }, } @@ -372,20 +470,25 @@ func TestEventStream_ACL(t *testing.T) { encoder := codec.NewEncoder(p1, structs.MsgpackHandle) require.Nil(encoder.Encode(req)) - publisher, err := s.State().EventPublisher() + publisher, err := s.State().EventBroker() require.NoError(err) // publish some events node := mock.Node() + publisher.Publish(&structs.Events{Index: uint64(1), Events: []structs.Event{{Topic: "test", Payload: node}}}) publisher.Publish(&structs.Events{Index: uint64(2), Events: []structs.Event{{Topic: "test", Payload: node}}}) + if tc.PublishFn != nil { + tc.PublishFn(publisher) + } + timeout := time.After(5 * time.Second) OUTER: for { select { case <-timeout: - require.Fail("timeout waiting for response") + t.Fatal("timeout waiting for events") case err := <-errCh: t.Fatal(err) case msg := <-streamMsg: @@ -398,7 +501,7 @@ func TestEventStream_ACL(t *testing.T) { if strings.Contains(msg.Error.Error(), tc.ExpectedErr) { break OUTER } else { - require.Fail("Unexpected error", msg.Error) + t.Fatalf("unexpected error %v", msg.Error) } } } diff --git a/nomad/fsm.go b/nomad/fsm.go index ead6d43e03f..d30c15d4528 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -8,6 +8,7 @@ import ( "time" metrics "github.com/armon/go-metrics" + "github.com/hashicorp/go-hclog" log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-msgpack/codec" @@ -102,7 +103,7 @@ type nomadFSM struct { type nomadSnapshot struct { snap *state.StateSnapshot timetable *TimeTable - durableEventCount int + durableEventCount int64 } // snapshotHeader is the first entry in our snapshot @@ -128,14 +129,17 @@ type FSMConfig struct { // Region is the region of the server embedding the FSM Region string - EnableEventPublisher bool + // EnableEventBroker specifies if the FSMs state store should enable + // it's event publisher. + EnableEventBroker bool + // EventBufferSize is the amount of messages to hold in memory EventBufferSize int64 // Durable count specifies the amount of events generated by the state store // to save to disk during snapshot generation. The most recent events // limited to count will be saved. - DurableEventCount int + DurableEventCount int64 } // NewFSMPath is used to construct a new FSM with a blank state @@ -144,7 +148,7 @@ func NewFSM(config *FSMConfig) (*nomadFSM, error) { sconfig := &state.StateStoreConfig{ Logger: config.Logger, Region: config.Region, - EnablePublisher: config.EnableEventPublisher, + EnablePublisher: config.EnableEventBroker, EventBufferSize: config.EventBufferSize, DurableEventCount: config.DurableEventCount, } @@ -176,7 +180,7 @@ func NewFSM(config *FSMConfig) (*nomadFSM, error) { // Close is used to cleanup resources associated with the FSM func (n *nomadFSM) Close() error { - n.state.StopEventPublisher() + n.state.StopEventBroker() return nil } @@ -1275,7 +1279,7 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { config := &state.StateStoreConfig{ Logger: n.config.Logger, Region: n.config.Region, - EnablePublisher: n.config.EnableEventPublisher, + EnablePublisher: n.config.EnableEventBroker, EventBufferSize: n.config.EventBufferSize, DurableEventCount: n.config.DurableEventCount, } @@ -1522,6 +1526,12 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { return err } case EventSnapshot: + // If the event broker is disabled but the snapshot from potentially + // a remote server has events, ignore them + if !n.config.EnableEventBroker { + return nil + } + event := new(structs.Events) if err := dec.Decode(event); err != nil { return err @@ -1543,7 +1553,9 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { } } - restore.Commit() + if err := restore.Commit(); err != nil { + return err + } // COMPAT Remove in 0.10 // Clean up active deployments that do not have a job @@ -1565,8 +1577,9 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { // Rehydrate the new state store's event publisher with the events // persisted in the snapshot - if n.config.EnableEventPublisher { - if err := rehydratePublisherFromState(n.state); err != nil { + if n.config.EnableEventBroker { + n.logger.Debug("Rehydrating event broker events from snapshot") + if err := rehydratePublisherFromState(n.state, n.logger); err != nil { n.logger.Error("Error re-hydrating event publisher during restore", "error", err) } } @@ -1574,8 +1587,11 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { return nil } -func rehydratePublisherFromState(s *state.StateStore) error { - pub, err := s.EventPublisher() +// rehydratePublisherFromState is used during a snapshot restore to +// add the persisted events from that snapshot that were just added to memdb +// back into the event publisher +func rehydratePublisherFromState(s *state.StateStore, l hclog.Logger) error { + pub, err := s.EventBroker() if err != nil { return err } @@ -1584,6 +1600,7 @@ func rehydratePublisherFromState(s *state.StateStore) error { if err != nil { return err } + count := 0 for { raw := events.Next() if raw == nil { @@ -1591,7 +1608,10 @@ func rehydratePublisherFromState(s *state.StateStore) error { } e := raw.(*structs.Events) pub.Publish(e) + count++ } + + l.Debug("finished hydrating event broker from snapshot", "events", count) return nil } @@ -2386,7 +2406,7 @@ func (s *nomadSnapshot) persistEvents(sink raft.SnapshotSink, encoder *codec.Enc return err } - count := 0 + var count int64 for { // Get the next item raw := events.Next() @@ -2397,14 +2417,12 @@ func (s *nomadSnapshot) persistEvents(sink raft.SnapshotSink, encoder *codec.Enc // Prepare the request struct event := raw.(*structs.Events) - eventCount := len(event.Events) - // Write out a volume snapshot sink.Write([]byte{byte(EventSnapshot)}) if err := encoder.Encode(event); err != nil { return err } - count += eventCount + count += int64(len(event.Events)) // Only write to sink until durableCount has been reached if count >= s.durableEventCount { diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index b1f8d83dd2e..2557e59e86a 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -50,13 +50,13 @@ func testFSM(t *testing.T) *nomadFSM { dispatcher, _ := testPeriodicDispatcher(t) logger := testlog.HCLogger(t) fsmConfig := &FSMConfig{ - EvalBroker: broker, - Periodic: dispatcher, - Blocked: NewBlockedEvals(broker, logger), - Logger: logger, - Region: "global", - EnableEventPublisher: true, - EventBufferSize: 100, + EvalBroker: broker, + Periodic: dispatcher, + Blocked: NewBlockedEvals(broker, logger), + Logger: logger, + Region: "global", + EnableEventBroker: true, + EventBufferSize: 100, } fsm, err := NewFSM(fsmConfig) if err != nil { @@ -3206,7 +3206,7 @@ func TestFSM_SnapshotRestore_Events_WithDurability(t *testing.T) { t.Parallel() // Add some state fsm := testFSM(t) - fsm.config.EnableEventPublisher = true + fsm.config.EnableEventBroker = true // DurableEventCount = 4 each mock events wrapper contains 2 events fsm.config.DurableEventCount = 4 @@ -3246,7 +3246,7 @@ func TestFSM_SnapshotRestore_Events_WithDurability(t *testing.T) { raw1 := iter.Next() require.Nil(t, raw1) - pub, err := state2.EventPublisher() + pub, err := state2.EventBroker() require.NoError(t, err) testutil.WaitForResult(func() (bool, error) { @@ -3264,7 +3264,7 @@ func TestFSM_SnapshotRestore_Events_NoDurability(t *testing.T) { t.Parallel() fsm := testFSM(t) // Enable event publisher with durable event count of zero - fsm.config.EnableEventPublisher = true + fsm.config.EnableEventBroker = true fsm.config.DurableEventCount = 0 state := fsm.State() diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 0f685dd67c2..23baba2e41c 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -1917,7 +1917,7 @@ func TestClientEndpoint_GetClientAllocs_Blocking_GC(t *testing.T) { if time.Since(start) < 100*time.Millisecond { t.Fatalf("too fast") } - assert.EqualValues(200, int(resp3.Index)) + assert.EqualValues(200, resp3.Index) if assert.Len(resp3.Allocs, 1) { assert.EqualValues(100, resp3.Allocs[alloc1.ID]) } diff --git a/nomad/server.go b/nomad/server.go index 29757dd3c08..44de55471b9 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -1211,13 +1211,14 @@ func (s *Server) setupRaft() error { // Create the FSM fsmConfig := &FSMConfig{ - EvalBroker: s.evalBroker, - Periodic: s.periodicDispatcher, - Blocked: s.blockedEvals, - Logger: s.logger, - Region: s.Region(), - EnableEventPublisher: s.config.EnableEventPublisher, - EventBufferSize: s.config.EventBufferSize, + EvalBroker: s.evalBroker, + Periodic: s.periodicDispatcher, + Blocked: s.blockedEvals, + Logger: s.logger, + Region: s.Region(), + EnableEventBroker: s.config.EnableEventBroker, + DurableEventCount: s.config.DurableEventCount, + EventBufferSize: s.config.EventBufferSize, } var err error s.fsm, err = NewFSM(fsmConfig) diff --git a/nomad/state/deployment_events_test.go b/nomad/state/deployment_events_test.go index bbaeb7996ea..08e06e45a8d 100644 --- a/nomad/state/deployment_events_test.go +++ b/nomad/state/deployment_events_test.go @@ -5,7 +5,6 @@ import ( "testing" "time" - "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/stream" "github.com/hashicorp/nomad/nomad/structs" @@ -15,7 +14,7 @@ import ( func TestDeploymentEventFromChanges(t *testing.T) { t.Parallel() s := TestStateStoreCfg(t, TestStateStorePublisher(t)) - defer s.StopEventPublisher() + defer s.StopEventBroker() // setup setupTx := s.db.WriteTxn(10) @@ -59,82 +58,6 @@ func TestDeploymentEventFromChanges(t *testing.T) { } -func TestDeploymentEventFromChanges_Promotion(t *testing.T) { - t.Parallel() - s := TestStateStoreCfg(t, TestStateStorePublisher(t)) - defer s.StopEventPublisher() - - // setup - setupTx := s.db.WriteTxn(10) - - j := mock.Job() - tg1 := j.TaskGroups[0] - tg2 := tg1.Copy() - tg2.Name = "foo" - j.TaskGroups = append(j.TaskGroups, tg2) - require.NoError(t, s.upsertJobImpl(10, j, false, setupTx)) - - d := mock.Deployment() - d.StatusDescription = structs.DeploymentStatusDescriptionRunningNeedsPromotion - d.JobID = j.ID - d.TaskGroups = map[string]*structs.DeploymentState{ - "web": { - DesiredTotal: 10, - DesiredCanaries: 1, - }, - "foo": { - DesiredTotal: 10, - DesiredCanaries: 1, - }, - } - require.NoError(t, s.upsertDeploymentImpl(10, d, setupTx)) - - // create set of allocs - c1 := mock.Alloc() - c1.JobID = j.ID - c1.DeploymentID = d.ID - d.TaskGroups[c1.TaskGroup].PlacedCanaries = append(d.TaskGroups[c1.TaskGroup].PlacedCanaries, c1.ID) - c1.DeploymentStatus = &structs.AllocDeploymentStatus{ - Healthy: helper.BoolToPtr(true), - } - c2 := mock.Alloc() - c2.JobID = j.ID - c2.DeploymentID = d.ID - d.TaskGroups[c2.TaskGroup].PlacedCanaries = append(d.TaskGroups[c2.TaskGroup].PlacedCanaries, c2.ID) - c2.TaskGroup = tg2.Name - c2.DeploymentStatus = &structs.AllocDeploymentStatus{ - Healthy: helper.BoolToPtr(true), - } - - require.NoError(t, s.upsertAllocsImpl(10, []*structs.Allocation{c1, c2}, setupTx)) - - // commit setup transaction - setupTx.Txn.Commit() - - e := mock.Eval() - // Request to promote canaries - msgType := structs.DeploymentPromoteRequestType - req := &structs.ApplyDeploymentPromoteRequest{ - DeploymentPromoteRequest: structs.DeploymentPromoteRequest{ - DeploymentID: d.ID, - All: true, - }, - Eval: e, - } - - require.NoError(t, s.UpdateDeploymentPromotion(msgType, 100, req)) - - events := WaitForEvents(t, s, 100, 1, 1*time.Second) - require.Len(t, events, 4) - - got := events[0] - require.Equal(t, uint64(100), got.Index) - require.Equal(t, d.ID, got.Key) - - de := got.Payload.(*DeploymentEvent) - require.Equal(t, structs.DeploymentStatusRunning, de.Deployment.Status) -} - func WaitForEvents(t *testing.T, s *StateStore, index uint64, minEvents int, timeout time.Duration) []structs.Event { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -163,15 +86,19 @@ func WaitForEvents(t *testing.T, s *StateStore, index uint64, minEvents int, tim } func EventsForIndex(t *testing.T, s *StateStore, index uint64) []structs.Event { - pub, err := s.EventPublisher() + pub, err := s.EventBroker() require.NoError(t, err) sub, err := pub.Subscribe(&stream.SubscribeRequest{ Topics: map[structs.Topic][]string{ - "*": []string{"*"}, + "*": {"*"}, }, - Index: index, + Index: index, + StartExactlyAtIndex: true, }) + if err != nil { + return []structs.Event{} + } defer sub.Unsubscribe() require.NoError(t, err) diff --git a/nomad/state/events.go b/nomad/state/events.go index f396975f67a..a5a65a02b8c 100644 --- a/nomad/state/events.go +++ b/nomad/state/events.go @@ -7,22 +7,11 @@ import ( ) const ( - TopicDeployment structs.Topic = "Deployment" - TopicEval structs.Topic = "Eval" - TopicAlloc structs.Topic = "Alloc" - TopicJob structs.Topic = "Job" - // TopicNodeRegistration stream.Topic = "NodeRegistration" - // TopicNodeDeregistration stream.Topic = "NodeDeregistration" - // TopicNodeDrain stream.Topic = "NodeDrain" - TopicNode structs.Topic = "Node" - - // TODO(drew) Node Events use TopicNode + Type - TypeNodeRegistration = "NodeRegistration" - TypeNodeDeregistration = "NodeDeregistration" - TypeNodeEligibilityUpdate = "NodeEligibility" - TypeNodeDrain = "NodeDrain" - TypeNodeEvent = "NodeEvent" - + TypeNodeRegistration = "NodeRegistration" + TypeNodeDeregistration = "NodeDeregistration" + TypeNodeEligibilityUpdate = "NodeEligibility" + TypeNodeDrain = "NodeDrain" + TypeNodeEvent = "NodeEvent" TypeDeploymentUpdate = "DeploymentStatusUpdate" TypeDeploymentPromotion = "DeploymentPromotion" TypeDeploymentAllocHealth = "DeploymentAllocHealth" @@ -36,22 +25,28 @@ const ( TypePlanResult = "PlanResult" ) +// JobEvent holds a newly updated Job. type JobEvent struct { Job *structs.Job } +// EvalEvent holds a newly updated Eval. type EvalEvent struct { Eval *structs.Evaluation } +// AllocEvent holds a newly updated Allocation. The +// Allocs embedded Job has been removed to reduce size. type AllocEvent struct { Alloc *structs.Allocation } +// DeploymentEvent holds a newly updated Deployment. type DeploymentEvent struct { Deployment *structs.Deployment } +// NodeEvent holds a newly updated Node type NodeEvent struct { Node *structs.Node } @@ -74,43 +69,30 @@ type JobDrainDetails struct { AllocDetails map[string]NodeDrainAllocDetails } +var MsgTypeEvents = map[structs.MessageType]string{ + structs.NodeRegisterRequestType: TypeNodeRegistration, + structs.UpsertNodeEventsType: TypeNodeEvent, + structs.EvalUpdateRequestType: TypeEvalUpdated, + structs.AllocClientUpdateRequestType: TypeAllocUpdated, + structs.JobRegisterRequestType: TypeJobRegistered, + structs.AllocUpdateRequestType: TypeAllocUpdated, + structs.NodeUpdateStatusRequestType: TypeNodeEvent, + structs.JobDeregisterRequestType: TypeJobDeregistered, + structs.JobBatchDeregisterRequestType: TypeJobBatchDeregistered, + structs.AllocUpdateDesiredTransitionRequestType: TypeAllocUpdateDesiredStatus, + structs.NodeUpdateEligibilityRequestType: TypeNodeDrain, + structs.BatchNodeUpdateDrainRequestType: TypeNodeDrain, + structs.DeploymentStatusUpdateRequestType: TypeDeploymentUpdate, + structs.DeploymentPromoteRequestType: TypeDeploymentPromotion, + structs.DeploymentAllocHealthRequestType: TypeDeploymentAllocHealth, + structs.ApplyPlanResultsRequestType: TypePlanResult, +} + +// GenericEventsFromChanges returns a set of events for a given set of +// transaction changes. It currently ignores Delete operations. func GenericEventsFromChanges(tx ReadTxn, changes Changes) (*structs.Events, error) { - var eventType string - switch changes.MsgType { - case structs.NodeRegisterRequestType: - eventType = TypeNodeRegistration - case structs.UpsertNodeEventsType: - eventType = TypeNodeEvent - case structs.EvalUpdateRequestType: - eventType = TypeEvalUpdated - case structs.AllocClientUpdateRequestType: - eventType = TypeAllocUpdated - case structs.JobRegisterRequestType: - eventType = TypeJobRegistered - case structs.AllocUpdateRequestType: - eventType = TypeAllocUpdated - case structs.NodeUpdateStatusRequestType: - eventType = TypeNodeEvent - case structs.JobDeregisterRequestType: - eventType = TypeJobDeregistered - case structs.JobBatchDeregisterRequestType: - eventType = TypeJobBatchDeregistered - case structs.AllocUpdateDesiredTransitionRequestType: - eventType = TypeAllocUpdateDesiredStatus - case structs.NodeUpdateEligibilityRequestType: - eventType = TypeNodeDrain - case structs.BatchNodeUpdateDrainRequestType: - eventType = TypeNodeDrain - case structs.DeploymentStatusUpdateRequestType: - eventType = TypeDeploymentUpdate - case structs.DeploymentPromoteRequestType: - eventType = TypeDeploymentPromotion - case structs.DeploymentAllocHealthRequestType: - eventType = TypeDeploymentAllocHealth - case structs.ApplyPlanResultsRequestType: - eventType = TypePlanResult - default: - // unknown request type + eventType, ok := MsgTypeEvents[changes.MsgType] + if !ok { return nil, nil } @@ -127,7 +109,7 @@ func GenericEventsFromChanges(tx ReadTxn, changes Changes) (*structs.Events, err } event := structs.Event{ - Topic: TopicEval, + Topic: structs.TopicEval, Type: eventType, Index: changes.Index, Key: after.ID, @@ -149,15 +131,22 @@ func GenericEventsFromChanges(tx ReadTxn, changes Changes) (*structs.Events, err } alloc := after.Copy() + + filterKeys := []string{ + alloc.JobID, + alloc.DeploymentID, + } + // remove job info to help keep size of alloc event down alloc.Job = nil event := structs.Event{ - Topic: TopicAlloc, - Type: eventType, - Index: changes.Index, - Key: after.ID, - Namespace: after.Namespace, + Topic: structs.TopicAlloc, + Type: eventType, + Index: changes.Index, + Key: after.ID, + FilterKeys: filterKeys, + Namespace: after.Namespace, Payload: &AllocEvent{ Alloc: alloc, }, @@ -174,7 +163,7 @@ func GenericEventsFromChanges(tx ReadTxn, changes Changes) (*structs.Events, err } event := structs.Event{ - Topic: TopicAlloc, + Topic: structs.TopicJob, Type: eventType, Index: changes.Index, Key: after.ID, @@ -195,7 +184,7 @@ func GenericEventsFromChanges(tx ReadTxn, changes Changes) (*structs.Events, err } event := structs.Event{ - Topic: TopicNode, + Topic: structs.TopicNode, Type: eventType, Index: changes.Index, Key: after.ID, @@ -214,7 +203,7 @@ func GenericEventsFromChanges(tx ReadTxn, changes Changes) (*structs.Events, err } event := structs.Event{ - Topic: TopicNode, + Topic: structs.TopicDeployment, Type: eventType, Index: changes.Index, Key: after.ID, diff --git a/nomad/state/events_test.go b/nomad/state/events_test.go new file mode 100644 index 00000000000..ba93adedb95 --- /dev/null +++ b/nomad/state/events_test.go @@ -0,0 +1,567 @@ +package state + +import ( + "testing" + "time" + + "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/require" +) + +// structs.AllocClientUpdateRequestType: +// structs.AllocUpdateRequestType +// JobDeregisterRequestType +// jobregisterrequesttype + +func TestGenericEventsFromChanges_DeploymentUpdate(t *testing.T) { + t.Parallel() + s := TestStateStoreCfg(t, TestStateStorePublisher(t)) + defer s.StopEventBroker() + + // setup + setupTx := s.db.WriteTxn(10) + + j := mock.Job() + e := mock.Eval() + e.JobID = j.ID + + d := mock.Deployment() + d.JobID = j.ID + + require.NoError(t, s.upsertJobImpl(10, j, false, setupTx)) + require.NoError(t, s.upsertDeploymentImpl(10, d, setupTx)) + + setupTx.Txn.Commit() + + msgType := structs.DeploymentStatusUpdateRequestType + + req := &structs.DeploymentStatusUpdateRequest{ + DeploymentUpdate: &structs.DeploymentStatusUpdate{ + DeploymentID: d.ID, + Status: structs.DeploymentStatusPaused, + StatusDescription: structs.DeploymentStatusDescriptionPaused, + }, + Eval: e, + // Exlude Job and assert its added + } + + require.NoError(t, s.UpdateDeploymentStatus(msgType, 100, req)) + + events := WaitForEvents(t, s, 100, 1, 1*time.Second) + require.Len(t, events, 2) + + got := events[0] + require.Equal(t, uint64(100), got.Index) + require.Equal(t, d.ID, got.Key) + + de := got.Payload.(*DeploymentEvent) + require.Equal(t, structs.DeploymentStatusPaused, de.Deployment.Status) + require.Contains(t, got.FilterKeys, j.ID) +} + +func TestGenericEventsFromChanges_DeploymentPromotion(t *testing.T) { + t.Parallel() + s := TestStateStoreCfg(t, TestStateStorePublisher(t)) + defer s.StopEventBroker() + + // setup + setupTx := s.db.WriteTxn(10) + + j := mock.Job() + tg1 := j.TaskGroups[0] + tg2 := tg1.Copy() + tg2.Name = "foo" + j.TaskGroups = append(j.TaskGroups, tg2) + require.NoError(t, s.upsertJobImpl(10, j, false, setupTx)) + + d := mock.Deployment() + d.StatusDescription = structs.DeploymentStatusDescriptionRunningNeedsPromotion + d.JobID = j.ID + d.TaskGroups = map[string]*structs.DeploymentState{ + "web": { + DesiredTotal: 10, + DesiredCanaries: 1, + }, + "foo": { + DesiredTotal: 10, + DesiredCanaries: 1, + }, + } + require.NoError(t, s.upsertDeploymentImpl(10, d, setupTx)) + + // create set of allocs + c1 := mock.Alloc() + c1.JobID = j.ID + c1.DeploymentID = d.ID + d.TaskGroups[c1.TaskGroup].PlacedCanaries = append(d.TaskGroups[c1.TaskGroup].PlacedCanaries, c1.ID) + c1.DeploymentStatus = &structs.AllocDeploymentStatus{ + Healthy: helper.BoolToPtr(true), + } + c2 := mock.Alloc() + c2.JobID = j.ID + c2.DeploymentID = d.ID + d.TaskGroups[c2.TaskGroup].PlacedCanaries = append(d.TaskGroups[c2.TaskGroup].PlacedCanaries, c2.ID) + c2.TaskGroup = tg2.Name + c2.DeploymentStatus = &structs.AllocDeploymentStatus{ + Healthy: helper.BoolToPtr(true), + } + + require.NoError(t, s.upsertAllocsImpl(10, []*structs.Allocation{c1, c2}, setupTx)) + + // commit setup transaction + setupTx.Txn.Commit() + + e := mock.Eval() + // Request to promote canaries + msgType := structs.DeploymentPromoteRequestType + req := &structs.ApplyDeploymentPromoteRequest{ + DeploymentPromoteRequest: structs.DeploymentPromoteRequest{ + DeploymentID: d.ID, + All: true, + }, + Eval: e, + } + + require.NoError(t, s.UpdateDeploymentPromotion(msgType, 100, req)) + + events := WaitForEvents(t, s, 100, 1, 1*time.Second) + require.Len(t, events, 4) + + got := events[0] + require.Equal(t, uint64(100), got.Index) + require.Equal(t, d.ID, got.Key) + + de := got.Payload.(*DeploymentEvent) + require.Equal(t, structs.DeploymentStatusRunning, de.Deployment.Status) + require.Equal(t, TypeDeploymentPromotion, got.Type) +} + +func TestGenericEventsFromChanges_DeploymentAllocHealthRequestType(t *testing.T) { + t.Parallel() + s := TestStateStoreCfg(t, TestStateStorePublisher(t)) + defer s.StopEventBroker() + + // setup + setupTx := s.db.WriteTxn(10) + + j := mock.Job() + tg1 := j.TaskGroups[0] + tg2 := tg1.Copy() + tg2.Name = "foo" + j.TaskGroups = append(j.TaskGroups, tg2) + require.NoError(t, s.upsertJobImpl(10, j, false, setupTx)) + + d := mock.Deployment() + d.StatusDescription = structs.DeploymentStatusDescriptionRunningNeedsPromotion + d.JobID = j.ID + d.TaskGroups = map[string]*structs.DeploymentState{ + "web": { + DesiredTotal: 10, + DesiredCanaries: 1, + }, + "foo": { + DesiredTotal: 10, + DesiredCanaries: 1, + }, + } + require.NoError(t, s.upsertDeploymentImpl(10, d, setupTx)) + + // create set of allocs + c1 := mock.Alloc() + c1.JobID = j.ID + c1.DeploymentID = d.ID + d.TaskGroups[c1.TaskGroup].PlacedCanaries = append(d.TaskGroups[c1.TaskGroup].PlacedCanaries, c1.ID) + c1.DeploymentStatus = &structs.AllocDeploymentStatus{ + Healthy: helper.BoolToPtr(true), + } + c2 := mock.Alloc() + c2.JobID = j.ID + c2.DeploymentID = d.ID + d.TaskGroups[c2.TaskGroup].PlacedCanaries = append(d.TaskGroups[c2.TaskGroup].PlacedCanaries, c2.ID) + c2.TaskGroup = tg2.Name + c2.DeploymentStatus = &structs.AllocDeploymentStatus{ + Healthy: helper.BoolToPtr(true), + } + + require.NoError(t, s.upsertAllocsImpl(10, []*structs.Allocation{c1, c2}, setupTx)) + + // Commit setup + setupTx.Commit() + + msgType := structs.DeploymentAllocHealthRequestType + + req := &structs.ApplyDeploymentAllocHealthRequest{ + DeploymentAllocHealthRequest: structs.DeploymentAllocHealthRequest{ + DeploymentID: d.ID, + HealthyAllocationIDs: []string{c1.ID}, + UnhealthyAllocationIDs: []string{c2.ID}, + }, + DeploymentUpdate: &structs.DeploymentStatusUpdate{ + DeploymentID: d.ID, + }, + } + + require.NoError(t, s.UpdateDeploymentAllocHealth(msgType, 100, req)) + + events := WaitForEvents(t, s, 100, 1, 1*time.Second) + require.Len(t, events, 3) + + var allocEvents []structs.Event + var deploymentEvent []structs.Event + for _, e := range events { + if e.Topic == structs.TopicAlloc { + allocEvents = append(allocEvents, e) + } else if e.Topic == structs.TopicDeployment { + deploymentEvent = append(deploymentEvent, e) + } + } + + require.Len(t, allocEvents, 2) + for _, e := range allocEvents { + require.Equal(t, 100, int(e.Index)) + require.Equal(t, TypeDeploymentAllocHealth, e.Type) + require.Equal(t, structs.TopicAlloc, e.Topic) + } + + require.Len(t, deploymentEvent, 1) + for _, e := range deploymentEvent { + require.Equal(t, 100, int(e.Index)) + require.Equal(t, TypeDeploymentAllocHealth, e.Type) + require.Equal(t, structs.TopicDeployment, e.Topic) + require.Equal(t, d.ID, e.Key) + } +} + +func TestGenericEventsFromChanges_UpsertNodeEventsType(t *testing.T) { + t.Parallel() + s := TestStateStoreCfg(t, TestStateStorePublisher(t)) + defer s.StopEventBroker() + + // setup + n1 := mock.Node() + n2 := mock.Node() + + require.NoError(t, s.UpsertNode(10, n1)) + require.NoError(t, s.UpsertNode(12, n2)) + + msgType := structs.UpsertNodeEventsType + req := &structs.EmitNodeEventsRequest{ + NodeEvents: map[string][]*structs.NodeEvent{ + n1.ID: { + { + Message: "update", + }, + }, + n2.ID: { + { + Message: "update", + }, + }, + }, + } + + require.NoError(t, s.UpsertNodeEventsMsgType(msgType, 100, req.NodeEvents)) + events := WaitForEvents(t, s, 100, 1, 1*time.Second) + require.Len(t, events, 2) + + for _, e := range events { + require.Equal(t, structs.TopicNode, e.Topic) + require.Equal(t, TypeNodeEvent, e.Type) + event := e.Payload.(*NodeEvent) + require.Equal(t, "update", event.Node.Events[len(event.Node.Events)-1].Message) + } + +} + +func TestGenericEventsFromChanges_NodeUpdateStatusRequest(t *testing.T) { + t.Parallel() + s := TestStateStoreCfg(t, TestStateStorePublisher(t)) + defer s.StopEventBroker() + + // setup + n1 := mock.Node() + + require.NoError(t, s.UpsertNode(10, n1)) + + updated := time.Now() + msgType := structs.NodeUpdateStatusRequestType + req := &structs.NodeUpdateStatusRequest{ + NodeID: n1.ID, + Status: structs.NodeStatusDown, + UpdatedAt: updated.UnixNano(), + NodeEvent: &structs.NodeEvent{Message: "down"}, + } + + require.NoError(t, s.UpdateNodeStatus(msgType, 100, req.NodeID, req.Status, req.UpdatedAt, req.NodeEvent)) + events := WaitForEvents(t, s, 100, 1, 1*time.Second) + require.Len(t, events, 1) + + e := events[0] + require.Equal(t, structs.TopicNode, e.Topic) + require.Equal(t, TypeNodeEvent, e.Type) + event := e.Payload.(*NodeEvent) + require.Equal(t, "down", event.Node.Events[len(event.Node.Events)-1].Message) + require.Equal(t, structs.NodeStatusDown, event.Node.Status) +} + +func TestGenericEventsFromChanges_EvalUpdateRequestType(t *testing.T) { + t.Parallel() + s := TestStateStoreCfg(t, TestStateStorePublisher(t)) + defer s.StopEventBroker() + + // setup + e1 := mock.Eval() + + require.NoError(t, s.UpsertEvals(10, []*structs.Evaluation{e1})) + + e2 := mock.Eval() + e2.ID = e1.ID + e2.JobID = e1.JobID + e2.Status = structs.EvalStatusBlocked + + msgType := structs.EvalUpdateRequestType + req := &structs.EvalUpdateRequest{ + Evals: []*structs.Evaluation{e2}, + } + + require.NoError(t, s.UpsertEvalsMsgType(msgType, 100, req.Evals)) + + events := WaitForEvents(t, s, 100, 1, 1*time.Second) + require.Len(t, events, 1) + + e := events[0] + require.Equal(t, structs.TopicEval, e.Topic) + require.Equal(t, TypeEvalUpdated, e.Type) + event := e.Payload.(*EvalEvent) + require.Equal(t, "blocked", event.Eval.Status) +} + +func TestGenericEventsFromChanges_ApplyPlanResultsRequestType(t *testing.T) { + t.Parallel() + s := TestStateStoreCfg(t, TestStateStorePublisher(t)) + defer s.StopEventBroker() + + // setup + alloc := mock.Alloc() + alloc2 := mock.Alloc() + job := alloc.Job + alloc.Job = nil + alloc2.Job = nil + + d := mock.Deployment() + alloc.DeploymentID = d.ID + alloc2.DeploymentID = d.ID + + require.NoError(t, s.UpsertJob(9, job)) + + eval := mock.Eval() + eval.JobID = job.ID + + // Create an eval + require.NoError(t, s.UpsertEvals(10, []*structs.Evaluation{eval})) + + msgType := structs.ApplyPlanResultsRequestType + req := &structs.ApplyPlanResultsRequest{ + AllocUpdateRequest: structs.AllocUpdateRequest{ + Alloc: []*structs.Allocation{alloc, alloc2}, + Job: job, + }, + Deployment: d, + EvalID: eval.ID, + } + + require.NoError(t, s.UpsertPlanResults(msgType, 100, req)) + + events := WaitForEvents(t, s, 100, 1, 1*time.Second) + require.Len(t, events, 5) + + var allocs []structs.Event + var evals []structs.Event + var jobs []structs.Event + var deploys []structs.Event + for _, e := range events { + if e.Topic == structs.TopicAlloc { + allocs = append(allocs, e) + } else if e.Topic == structs.TopicEval { + evals = append(evals, e) + } else if e.Topic == structs.TopicJob { + jobs = append(jobs, e) + } else if e.Topic == structs.TopicDeployment { + deploys = append(deploys, e) + } + require.Equal(t, TypePlanResult, e.Type) + } + require.Len(t, allocs, 2) + require.Len(t, evals, 1) + require.Len(t, jobs, 1) + require.Len(t, deploys, 1) +} + +func TestGenericEventsFromChanges_BatchNodeUpdateDrainRequestType(t *testing.T) { + t.Parallel() + s := TestStateStoreCfg(t, TestStateStorePublisher(t)) + defer s.StopEventBroker() + + // setup + n1 := mock.Node() + n2 := mock.Node() + + require.NoError(t, s.UpsertNode(10, n1)) + require.NoError(t, s.UpsertNode(11, n2)) + + updated := time.Now() + msgType := structs.BatchNodeUpdateDrainRequestType + + expectedDrain := &structs.DrainStrategy{ + DrainSpec: structs.DrainSpec{ + Deadline: -1 * time.Second, + }, + } + event := &structs.NodeEvent{ + Message: "Drain strategy enabled", + Subsystem: structs.NodeEventSubsystemDrain, + Timestamp: time.Now(), + } + req := structs.BatchNodeUpdateDrainRequest{ + Updates: map[string]*structs.DrainUpdate{ + n1.ID: { + DrainStrategy: expectedDrain, + }, + n2.ID: { + DrainStrategy: expectedDrain, + }, + }, + NodeEvents: map[string]*structs.NodeEvent{ + n1.ID: event, + n2.ID: event, + }, + UpdatedAt: updated.UnixNano(), + } + + require.NoError(t, s.BatchUpdateNodeDrain(msgType, 100, req.UpdatedAt, req.Updates, req.NodeEvents)) + + events := WaitForEvents(t, s, 100, 1, 1*time.Second) + require.Len(t, events, 2) + + for _, e := range events { + require.Equal(t, 100, int(e.Index)) + require.Equal(t, TypeNodeDrain, e.Type) + require.Equal(t, structs.TopicNode, e.Topic) + ne := e.Payload.(*NodeEvent) + require.Equal(t, event.Message, ne.Node.Events[len(ne.Node.Events)-1].Message) + } +} + +func TestGenericEventsFromChanges_NodeUpdateEligibilityRequestType(t *testing.T) { + t.Parallel() + s := TestStateStoreCfg(t, TestStateStorePublisher(t)) + defer s.StopEventBroker() + + // setup + n1 := mock.Node() + + require.NoError(t, s.UpsertNode(10, n1)) + + msgType := structs.NodeUpdateEligibilityRequestType + + event := &structs.NodeEvent{ + Message: "Node marked as ineligible", + Subsystem: structs.NodeEventSubsystemCluster, + Timestamp: time.Now(), + } + + req := structs.NodeUpdateEligibilityRequest{ + NodeID: n1.ID, + NodeEvent: event, + Eligibility: structs.NodeSchedulingIneligible, + UpdatedAt: time.Now().UnixNano(), + } + + require.NoError(t, s.UpdateNodeEligibility(msgType, 100, req.NodeID, req.Eligibility, req.UpdatedAt, req.NodeEvent)) + + events := WaitForEvents(t, s, 100, 1, 1*time.Second) + require.Len(t, events, 1) + + for _, e := range events { + require.Equal(t, 100, int(e.Index)) + require.Equal(t, TypeNodeDrain, e.Type) + require.Equal(t, structs.TopicNode, e.Topic) + ne := e.Payload.(*NodeEvent) + require.Equal(t, event.Message, ne.Node.Events[len(ne.Node.Events)-1].Message) + require.Equal(t, structs.NodeSchedulingIneligible, ne.Node.SchedulingEligibility) + } +} + +func TestGenericEventsFromChanges_AllocUpdateDesiredTransitionRequestType(t *testing.T) { + t.Parallel() + s := TestStateStoreCfg(t, TestStateStorePublisher(t)) + defer s.StopEventBroker() + + alloc := mock.Alloc() + + require.Nil(t, s.UpsertJob(10, alloc.Job)) + require.Nil(t, s.UpsertAllocs(11, []*structs.Allocation{alloc})) + + msgType := structs.AllocUpdateDesiredTransitionRequestType + + eval := &structs.Evaluation{ + ID: uuid.Generate(), + Namespace: alloc.Namespace, + Priority: alloc.Job.Priority, + Type: alloc.Job.Type, + TriggeredBy: structs.EvalTriggerNodeDrain, + JobID: alloc.Job.ID, + JobModifyIndex: alloc.Job.ModifyIndex, + Status: structs.EvalStatusPending, + } + evals := []*structs.Evaluation{eval} + + req := &structs.AllocUpdateDesiredTransitionRequest{ + Allocs: map[string]*structs.DesiredTransition{ + alloc.ID: {Migrate: helper.BoolToPtr(true)}, + }, + Evals: evals, + } + + require.NoError(t, s.UpdateAllocsDesiredTransitions(msgType, 100, req.Allocs, req.Evals)) + + events := WaitForEvents(t, s, 100, 1, 1*time.Second) + require.Len(t, events, 2) + + var allocs []structs.Event + var evalEvents []structs.Event + for _, e := range events { + if e.Topic == structs.TopicEval { + evalEvents = append(evalEvents, e) + } else if e.Topic == structs.TopicAlloc { + allocs = append(allocs, e) + } else { + require.Fail(t, "unexpected event type") + } + + require.Equal(t, TypeAllocUpdateDesiredStatus, e.Type) + } + + require.Len(t, allocs, 1) + require.Len(t, evalEvents, 1) +} + +func TestGenericEventsFromChanges_JobBatchDeregisterRequestType(t *testing.T) { + // TODO Job batch deregister logic mostly occurs in the FSM + t.SkipNow() + +} +func TestGenericEventsFromChanges_AllocClientUpdateRequestType(t *testing.T) { + t.SkipNow() +} + +func TestGenericEventsFromChanges_AllocUpdateRequestType(t *testing.T) { + t.SkipNow() +} + +func TestGenericEventsFromChanges_JobDeregisterRequestType(t *testing.T) { + t.SkipNow() +} diff --git a/nomad/state/node_events.go b/nomad/state/node_events.go index 4742b1aa587..178262fb99d 100644 --- a/nomad/state/node_events.go +++ b/nomad/state/node_events.go @@ -19,7 +19,7 @@ func NodeDeregisterEventFromChanges(tx ReadTxn, changes Changes) (*structs.Event } event := structs.Event{ - Topic: TopicNode, + Topic: structs.TopicNode, Type: TypeNodeDeregistration, Index: changes.Index, Key: before.ID, @@ -66,7 +66,7 @@ func NodeDrainEventFromChanges(tx ReadTxn, changes Changes) (*structs.Events, er } event := structs.Event{ - Topic: TopicNode, + Topic: structs.TopicNode, Type: TypeNodeDrain, Index: changes.Index, Key: after.ID, diff --git a/nomad/state/node_events_test.go b/nomad/state/node_events_test.go index 86874814c36..2789ae80f33 100644 --- a/nomad/state/node_events_test.go +++ b/nomad/state/node_events_test.go @@ -21,13 +21,13 @@ func TestNodeEventsFromChanges(t *testing.T) { }{ { MsgType: structs.NodeRegisterRequestType, - WantTopic: TopicNode, + WantTopic: structs.TopicNode, Name: "node registered", Mutate: func(s *StateStore, tx *txn) error { return upsertNodeTxn(tx, tx.Index, testNode()) }, WantEvents: []structs.Event{{ - Topic: TopicNode, + Topic: structs.TopicNode, Type: TypeNodeRegistration, Key: testNodeID(), Index: 100, @@ -39,13 +39,13 @@ func TestNodeEventsFromChanges(t *testing.T) { }, { MsgType: structs.NodeRegisterRequestType, - WantTopic: TopicNode, + WantTopic: structs.TopicNode, Name: "node registered initializing", Mutate: func(s *StateStore, tx *txn) error { return upsertNodeTxn(tx, tx.Index, testNode(nodeNotReady)) }, WantEvents: []structs.Event{{ - Topic: TopicNode, + Topic: structs.TopicNode, Type: TypeNodeRegistration, Key: testNodeID(), Index: 100, @@ -57,7 +57,7 @@ func TestNodeEventsFromChanges(t *testing.T) { }, { MsgType: structs.NodeDeregisterRequestType, - WantTopic: TopicNode, + WantTopic: structs.TopicNode, Name: "node deregistered", Setup: func(s *StateStore, tx *txn) error { return upsertNodeTxn(tx, tx.Index, testNode()) @@ -66,7 +66,7 @@ func TestNodeEventsFromChanges(t *testing.T) { return deleteNodeTxn(tx, tx.Index, []string{testNodeID()}) }, WantEvents: []structs.Event{{ - Topic: TopicNode, + Topic: structs.TopicNode, Type: TypeNodeDeregistration, Key: testNodeID(), Index: 100, @@ -78,7 +78,7 @@ func TestNodeEventsFromChanges(t *testing.T) { }, { MsgType: structs.NodeDeregisterRequestType, - WantTopic: TopicNode, + WantTopic: structs.TopicNode, Name: "batch node deregistered", Setup: func(s *StateStore, tx *txn) error { require.NoError(t, upsertNodeTxn(tx, tx.Index, testNode())) @@ -89,7 +89,7 @@ func TestNodeEventsFromChanges(t *testing.T) { }, WantEvents: []structs.Event{ { - Topic: TopicNode, + Topic: structs.TopicNode, Type: TypeNodeDeregistration, Key: testNodeID(), Index: 100, @@ -98,7 +98,7 @@ func TestNodeEventsFromChanges(t *testing.T) { }, }, { - Topic: TopicNode, + Topic: structs.TopicNode, Type: TypeNodeDeregistration, Key: testNodeIDTwo(), Index: 100, @@ -111,7 +111,7 @@ func TestNodeEventsFromChanges(t *testing.T) { }, { MsgType: structs.UpsertNodeEventsType, - WantTopic: TopicNode, + WantTopic: structs.TopicNode, Name: "batch node events upserted", Setup: func(s *StateStore, tx *txn) error { require.NoError(t, upsertNodeTxn(tx, tx.Index, testNode())) @@ -141,7 +141,7 @@ func TestNodeEventsFromChanges(t *testing.T) { }, WantEvents: []structs.Event{ { - Topic: TopicNode, + Topic: structs.TopicNode, Type: TypeNodeEvent, Key: testNodeID(), Index: 100, @@ -150,7 +150,7 @@ func TestNodeEventsFromChanges(t *testing.T) { }, }, { - Topic: TopicNode, + Topic: structs.TopicNode, Type: TypeNodeEvent, Key: testNodeIDTwo(), Index: 100, @@ -166,7 +166,7 @@ func TestNodeEventsFromChanges(t *testing.T) { for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { s := TestStateStoreCfg(t, TestStateStorePublisher(t)) - defer s.StopEventPublisher() + defer s.StopEventBroker() if tc.Setup != nil { // Bypass publish mechanism for setup @@ -215,7 +215,7 @@ func TestNodeEventsFromChanges(t *testing.T) { func TestNodeDrainEventFromChanges(t *testing.T) { t.Parallel() s := TestStateStoreCfg(t, TestStateStorePublisher(t)) - defer s.StopEventPublisher() + defer s.StopEventBroker() // setup setupTx := s.db.WriteTxn(10) @@ -251,7 +251,7 @@ func TestNodeDrainEventFromChanges(t *testing.T) { require.Len(t, got.Events, 1) - require.Equal(t, TopicNode, got.Events[0].Topic) + require.Equal(t, structs.TopicNode, got.Events[0].Topic) require.Equal(t, TypeNodeDrain, got.Events[0].Type) require.Equal(t, uint64(100), got.Events[0].Index) @@ -296,10 +296,6 @@ func nodeNotReady(n *structs.Node) { n.Status = structs.NodeStatusInit } -func nodeReady(n *structs.Node) { - n.Status = structs.NodeStatusReady -} - func nodeIDTwo(n *structs.Node) { n.ID = testNodeIDTwo() } diff --git a/nomad/state/state_changes.go b/nomad/state/state_changes.go index c94545d30d8..c7a2d9634e3 100644 --- a/nomad/state/state_changes.go +++ b/nomad/state/state_changes.go @@ -27,29 +27,20 @@ type Changes struct { // changeTrackerDB is a thin wrapper around memdb.DB which enables TrackChanges on // all write transactions. When the transaction is committed the changes are -// sent to the EventPublisher which will create and emit change events. +// sent to the EventBroker which will create and emit change events. type changeTrackerDB struct { - db *memdb.MemDB + memdb *memdb.MemDB durableCount int64 - publisher *stream.EventPublisher + publisher *stream.EventBroker processChanges func(ReadTxn, Changes) (*structs.Events, error) } -// ChangeConfig -type ChangeConfig struct { - DurableEventCount int -} - -func NewChangeTrackerDB(db *memdb.MemDB, publisher *stream.EventPublisher, changesFn changeProcessor, cfg *ChangeConfig) *changeTrackerDB { - if cfg == nil { - cfg = &ChangeConfig{} - } - +func NewChangeTrackerDB(db *memdb.MemDB, publisher *stream.EventBroker, changesFn changeProcessor, durableCount int64) *changeTrackerDB { return &changeTrackerDB{ - db: db, + memdb: db, publisher: publisher, processChanges: changesFn, - durableCount: int64(cfg.DurableEventCount), + durableCount: durableCount, } } @@ -63,7 +54,7 @@ func noOpProcessChanges(ReadTxn, Changes) (*structs.Events, error) { return nil, // TODO: this could return a regular memdb.Txn if all the state functions accepted // the ReadTxn interface func (c *changeTrackerDB) ReadTxn() *txn { - return &txn{Txn: c.db.Txn(false)} + return &txn{Txn: c.memdb.Txn(false)} } // WriteTxn returns a wrapped memdb.Txn suitable for writes to the state store. @@ -78,7 +69,7 @@ func (c *changeTrackerDB) ReadTxn() *txn { // data directly into the DB. These cases may use WriteTxnRestore. func (c *changeTrackerDB) WriteTxn(idx uint64) *txn { t := &txn{ - Txn: c.db.Txn(true), + Txn: c.memdb.Txn(true), Index: idx, publish: c.publish, } @@ -91,7 +82,7 @@ func (c *changeTrackerDB) WriteTxnMsgT(msgType structs.MessageType, idx uint64) t := &txn{ msgType: msgType, - Txn: c.db.Txn(true), + Txn: c.memdb.Txn(true), Index: idx, publish: c.publish, persistChanges: persistChanges, @@ -101,7 +92,7 @@ func (c *changeTrackerDB) WriteTxnMsgT(msgType structs.MessageType, idx uint64) } func (c *changeTrackerDB) publish(changes Changes) (*structs.Events, error) { - readOnlyTx := c.db.Txn(false) + readOnlyTx := c.memdb.Txn(false) defer readOnlyTx.Abort() events, err := c.processChanges(readOnlyTx, changes) @@ -123,12 +114,12 @@ func (c *changeTrackerDB) publish(changes Changes) (*structs.Events, error) { // written across many indexes. func (c *changeTrackerDB) WriteTxnRestore() *txn { return &txn{ - Txn: c.db.Txn(true), + Txn: c.memdb.Txn(true), Index: 0, } } -// txn wraps a memdb.Txn to capture changes and send them to the EventPublisher. +// txn wraps a memdb.Txn to capture changes and send them to the EventBroker. // // This can not be done with txn.Defer because the callback passed to Defer is // invoked after commit completes, and because the callback can not return an @@ -149,7 +140,7 @@ type txn struct { publish func(changes Changes) (*structs.Events, error) } -// Commit first pushes changes to EventPublisher, then calls Commit on the +// Commit first pushes changes to EventBroker, then calls Commit on the // underlying transaction. // // Note that this function, unlike memdb.Txn, returns an error which must be checked @@ -195,51 +186,11 @@ func processDBChanges(tx ReadTxn, changes Changes) (*structs.Events, error) { case structs.IgnoreUnknownTypeFlag: // unknown event type return nil, nil - case structs.NodeRegisterRequestType: - return GenericEventsFromChanges(tx, changes) - case structs.NodeUpdateStatusRequestType: - // TODO(drew) test - return GenericEventsFromChanges(tx, changes) case structs.NodeDeregisterRequestType: return NodeDeregisterEventFromChanges(tx, changes) case structs.NodeUpdateDrainRequestType: return NodeDrainEventFromChanges(tx, changes) - case structs.UpsertNodeEventsType: - return GenericEventsFromChanges(tx, changes) - case structs.DeploymentStatusUpdateRequestType: - return GenericEventsFromChanges(tx, changes) - case structs.DeploymentPromoteRequestType: - return GenericEventsFromChanges(tx, changes) - case structs.DeploymentAllocHealthRequestType: - return GenericEventsFromChanges(tx, changes) - case structs.ApplyPlanResultsRequestType: - // TODO test - return GenericEventsFromChanges(tx, changes) - case structs.EvalUpdateRequestType: - return GenericEventsFromChanges(tx, changes) - case structs.AllocClientUpdateRequestType: - return GenericEventsFromChanges(tx, changes) - case structs.JobRegisterRequestType: - // TODO(drew) test - return GenericEventsFromChanges(tx, changes) - case structs.AllocUpdateRequestType: - // TODO(drew) test - return GenericEventsFromChanges(tx, changes) - case structs.JobDeregisterRequestType: - // TODO(drew) test / handle delete - return GenericEventsFromChanges(tx, changes) - case structs.JobBatchDeregisterRequestType: - // TODO(drew) test & handle delete - return GenericEventsFromChanges(tx, changes) - case structs.AllocUpdateDesiredTransitionRequestType: - // TODO(drew) drain - return GenericEventsFromChanges(tx, changes) - case structs.NodeUpdateEligibilityRequestType: - // TODO(drew) test, drain - return GenericEventsFromChanges(tx, changes) - case structs.BatchNodeUpdateDrainRequestType: - // TODO(drew) test, drain + default: return GenericEventsFromChanges(tx, changes) } - return nil, nil } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index b50c809308d..ebb3f0a661d 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -54,7 +54,7 @@ type StateStoreConfig struct { // DurableEventCount is used to determine if events from transaction changes // should be saved in go-memdb - DurableEventCount int + DurableEventCount int64 } // The StateStore is responsible for maintaining all the Nomad @@ -77,7 +77,7 @@ type StateStore struct { // TODO: refactor abondonCh to use a context so that both can use the same // cancel mechanism. - stopEventPublisher func() + stopEventBroker func() } // NewStateStore is used to create a new state store @@ -91,27 +91,22 @@ func NewStateStore(config *StateStoreConfig) (*StateStore, error) { // Create the state store ctx, cancel := context.WithCancel(context.TODO()) s := &StateStore{ - logger: config.Logger.Named("state_store"), - config: config, - abandonCh: make(chan struct{}), - stopEventPublisher: cancel, + logger: config.Logger.Named("state_store"), + config: config, + abandonCh: make(chan struct{}), + stopEventBroker: cancel, } if config.EnablePublisher { - cfg := &ChangeConfig{ - DurableEventCount: config.DurableEventCount, - } - // Create new event publisher using provided config - publisher := stream.NewEventPublisher(ctx, stream.EventPublisherCfg{ - EventBufferTTL: 1 * time.Hour, + broker := stream.NewEventBroker(ctx, stream.EventBrokerCfg{ EventBufferSize: config.EventBufferSize, Logger: config.Logger, - OnEvict: s.eventPublisherEvict, + OnEvict: s.eventBrokerEvict, }) - s.db = NewChangeTrackerDB(db, publisher, processDBChanges, cfg) + s.db = NewChangeTrackerDB(db, broker, processDBChanges, config.DurableEventCount) } else { - s.db = NewChangeTrackerDB(db, nil, noOpProcessChanges, nil) + s.db = NewChangeTrackerDB(db, nil, noOpProcessChanges, 0) } // Initialize the state store with required enterprise objects @@ -122,16 +117,16 @@ func NewStateStore(config *StateStoreConfig) (*StateStore, error) { return s, nil } -func (s *StateStore) EventPublisher() (*stream.EventPublisher, error) { +func (s *StateStore) EventBroker() (*stream.EventBroker, error) { if s.db.publisher == nil { - return nil, fmt.Errorf("EventPublisher not configured") + return nil, fmt.Errorf("EventBroker not configured") } return s.db.publisher, nil } -// eventPublisherEvict is used as a callback to delete an evicted events +// eventBrokerEvict is used as a callback to delete an evicted events // entry from go-memdb. -func (s *StateStore) eventPublisherEvict(events *structs.Events) { +func (s *StateStore) eventBrokerEvict(events *structs.Events) { if err := s.deleteEvent(events); err != nil { if err == memdb.ErrNotFound { s.logger.Info("Evicted event was not found in go-memdb table", "event index", events.Index) @@ -142,7 +137,7 @@ func (s *StateStore) eventPublisherEvict(events *structs.Events) { } func (s *StateStore) deleteEvent(events *structs.Events) error { - txn := s.db.db.Txn(true) + txn := s.db.memdb.Txn(true) defer txn.Abort() if err := txn.Delete("events", events); err != nil { @@ -162,7 +157,7 @@ func (s *StateStore) Config() *StateStoreConfig { // we use MemDB, we just need to snapshot the state of the underlying // database. func (s *StateStore) Snapshot() (*StateSnapshot, error) { - memDBSnap := s.db.db.Snapshot() + memDBSnap := s.db.memdb.Snapshot() store := StateStore{ logger: s.logger, @@ -170,7 +165,7 @@ func (s *StateStore) Snapshot() (*StateSnapshot, error) { } // Create a new change tracker DB that does not publish or track changes - store.db = NewChangeTrackerDB(memDBSnap, nil, noOpProcessChanges, nil) + store.db = NewChangeTrackerDB(memDBSnap, nil, noOpProcessChanges, 0) snap := &StateSnapshot{ StateStore: store, @@ -253,14 +248,14 @@ func (s *StateStore) AbandonCh() <-chan struct{} { // Abandon is used to signal that the given state store has been abandoned. // Calling this more than one time will panic. func (s *StateStore) Abandon() { - s.StopEventPublisher() + s.StopEventBroker() close(s.abandonCh) } -// StopStopEventPublisher calls the cancel func for the state stores event +// StopStopEventBroker calls the cancel func for the state stores event // publisher. It should be called during server shutdown. -func (s *StateStore) StopEventPublisher() { - s.stopEventPublisher() +func (s *StateStore) StopEventBroker() { + s.stopEventBroker() } // QueryFn is the definition of a function that can be used to implement a basic @@ -340,7 +335,7 @@ func (s *StateStore) UpsertPlanResults(msgType structs.MessageType, index uint64 } } - // Update the status of dmsgType structs.MessageType by the plan. + // Update the status of deployments effected by the plan. if len(results.DeploymentUpdates) != 0 { s.upsertDeploymentUpdates(index, results.DeploymentUpdates, txn) } @@ -790,6 +785,7 @@ func (s *StateStore) ScalingEventsByJob(ws memdb.WatchSet, namespace, jobID stri // UpsertNodeMsgType is used to register a node or update a node definition // This is assumed to be triggered by the client, so we retain the value // of drain/eligibility which is set by the scheduler. +// TODO(drew) remove this and update all test callers of UpsertNode to use msgType func (s *StateStore) UpsertNodeMsgType(msgType structs.MessageType, index uint64, node *structs.Node) error { txn := s.db.WriteTxnMsgT(msgType, index) defer txn.Abort() @@ -972,7 +968,7 @@ func (s *StateStore) updateNodeStatusTxn(txn *txn, nodeID, status string, update // BatchUpdateNodeDrain is used to update the drain of a node set of nodes func (s *StateStore) BatchUpdateNodeDrain(msgType structs.MessageType, index uint64, updatedAt int64, updates map[string]*structs.DrainUpdate, events map[string]*structs.NodeEvent) error { - txn := s.db.WriteTxn(index) + txn := s.db.WriteTxnMsgT(msgType, index) defer txn.Abort() for node, update := range updates { if err := s.updateNodeDrainImpl(txn, index, node, update.DrainStrategy, update.MarkEligible, updatedAt, events[node]); err != nil { @@ -5900,8 +5896,8 @@ func (s *StateRestore) Abort() { } // Commit is used to commit the restore operation -func (s *StateRestore) Commit() { - s.txn.Commit() +func (s *StateRestore) Commit() error { + return s.txn.Commit() } // NodeRestore is used to restore a node diff --git a/nomad/state/state_store_events_test.go b/nomad/state/state_store_events_test.go index 08a5b88b770..146b792e972 100644 --- a/nomad/state/state_store_events_test.go +++ b/nomad/state/state_store_events_test.go @@ -27,7 +27,7 @@ func TestStateStore_Events_OnEvict(t *testing.T) { } s := TestStateStoreCfg(t, cfg) - _, err := s.EventPublisher() + _, err := s.EventBroker() require.NoError(t, err) // force 3 evictions @@ -85,7 +85,7 @@ func TestStateStore_Events_OnEvict_Missing(t *testing.T) { } s := TestStateStoreCfg(t, cfg) - _, err := s.EventPublisher() + _, err := s.EventBroker() require.NoError(t, err) getEvents := func() []*structs.Events { diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 1c0f345f57e..7fed591a4a4 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -1368,7 +1368,7 @@ func TestStateStore_RestoreNode(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - restore.Commit() + require.NoError(t, restore.Commit()) ws := memdb.NewWatchSet() out, err := state.NodeByID(ws, node.ID) @@ -2441,7 +2441,7 @@ func TestStateStore_RestoreJob(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - restore.Commit() + require.NoError(t, restore.Commit()) ws := memdb.NewWatchSet() out, err := state.JobByID(ws, job.Namespace, job.ID) @@ -2711,7 +2711,7 @@ func TestStateStore_RestorePeriodicLaunch(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - restore.Commit() + require.NoError(t, restore.Commit()) ws := memdb.NewWatchSet() out, err := state.PeriodicLaunchByID(ws, job.Namespace, job.ID) @@ -2743,7 +2743,7 @@ func TestStateStore_RestoreJobVersion(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - restore.Commit() + require.NoError(t, restore.Commit()) ws := memdb.NewWatchSet() out, err := state.JobByIDAndVersion(ws, job.Namespace, job.ID, job.Version) @@ -2775,7 +2775,7 @@ func TestStateStore_RestoreDeployment(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - restore.Commit() + require.NoError(t, restore.Commit()) ws := memdb.NewWatchSet() out, err := state.DeploymentByID(ws, d.ID) @@ -2815,7 +2815,7 @@ func TestStateStore_RestoreJobSummary(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - restore.Commit() + require.NoError(t, restore.Commit()) ws := memdb.NewWatchSet() out, err := state.JobSummaryByID(ws, job.Namespace, job.ID) @@ -3617,7 +3617,7 @@ func TestStateStore_RestoreCSIPlugin(t *testing.T) { err = restore.CSIPluginRestore(plugin) require.NoError(err) - restore.Commit() + require.NoError(restore.Commit()) ws := memdb.NewWatchSet() out, err := state.CSIPluginByID(ws, plugin.ID) @@ -3731,7 +3731,7 @@ func TestStateStore_RestoreIndex(t *testing.T) { t.Fatalf("err: %v", err) } - restore.Commit() + require.NoError(t, restore.Commit()) out, err := state.Index("jobs") if err != nil { @@ -4397,7 +4397,7 @@ func TestStateStore_RestoreEval(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - restore.Commit() + require.NoError(t, restore.Commit()) ws := memdb.NewWatchSet() out, err := state.EvalByID(ws, eval.ID) @@ -6033,7 +6033,7 @@ func TestStateStore_RestoreAlloc(t *testing.T) { t.Fatalf("err: %v", err) } - restore.Commit() + require.NoError(t, restore.Commit()) ws := memdb.NewWatchSet() out, err := state.AllocByID(ws, alloc.ID) @@ -7701,7 +7701,7 @@ func TestStateStore_RestoreVaultAccessor(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - restore.Commit() + require.NoError(t, restore.Commit()) ws := memdb.NewWatchSet() out, err := state.VaultAccessor(ws, a.Accessor) @@ -7902,7 +7902,7 @@ func TestStateStore_RestoreSITokenAccessor(t *testing.T) { err = restore.SITokenAccessorRestore(a1) r.NoError(err) - restore.Commit() + require.NoError(t, restore.Commit()) ws := memdb.NewWatchSet() result, err := state.SITokenAccessor(ws, a1.AccessorID) @@ -8382,7 +8382,7 @@ func TestStateStore_RestoreACLPolicy(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - restore.Commit() + require.NoError(t, restore.Commit()) ws := memdb.NewWatchSet() out, err := state.ACLPolicyByName(ws, policy.Name) @@ -8441,7 +8441,7 @@ func TestStateStore_RestoreACLToken(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - restore.Commit() + require.NoError(t, restore.Commit()) ws := memdb.NewWatchSet() out, err := state.ACLTokenByAccessorID(ws, token.AccessorID) @@ -8470,7 +8470,7 @@ func TestStateStore_SchedulerConfig(t *testing.T) { err = restore.SchedulerConfigRestore(schedConfig) require.Nil(err) - restore.Commit() + require.NoError(restore.Commit()) modIndex, out, err := state.SchedulerConfig() require.Nil(err) @@ -8510,7 +8510,7 @@ func TestStateStore_ClusterMetadataRestore(t *testing.T) { err = restore.ClusterMetadataRestore(meta) require.NoError(err) - restore.Commit() + require.NoError(restore.Commit()) out, err := state.ClusterMetadata(nil) require.NoError(err) @@ -8530,7 +8530,7 @@ func TestStateStore_RestoreScalingPolicy(t *testing.T) { err = restore.ScalingPolicyRestore(scalingPolicy) require.NoError(err) - restore.Commit() + require.NoError(restore.Commit()) ws := memdb.NewWatchSet() out, err := state.ScalingPolicyByID(ws, scalingPolicy.ID) @@ -9543,7 +9543,7 @@ func TestStateStore_RestoreScalingEvents(t *testing.T) { err = restore.ScalingEventsRestore(jobScalingEvents) require.NoError(err) - restore.Commit() + require.NoError(restore.Commit()) ws := memdb.NewWatchSet() out, _, err := state.ScalingEventsByJob(ws, jobScalingEvents.Namespace, diff --git a/nomad/stream/event_buffer.go b/nomad/stream/event_buffer.go index 42c9dc3a4c2..419b27f9837 100644 --- a/nomad/stream/event_buffer.go +++ b/nomad/stream/event_buffer.go @@ -50,19 +50,17 @@ type eventBuffer struct { head atomic.Value tail atomic.Value - maxSize int64 - maxItemTTL time.Duration - onEvict EvictCallbackFn + maxSize int64 + onEvict EvictCallbackFn } // newEventBuffer creates an eventBuffer ready for use. -func newEventBuffer(size int64, maxItemTTL time.Duration, onEvict EvictCallbackFn) *eventBuffer { +func newEventBuffer(size int64, onEvict EvictCallbackFn) *eventBuffer { zero := int64(0) b := &eventBuffer{ - maxSize: size, - size: &zero, - maxItemTTL: maxItemTTL, - onEvict: onEvict, + maxSize: size, + size: &zero, + onEvict: onEvict, } item := newBufferItem(&structs.Events{Index: 0, Events: nil}) @@ -77,7 +75,7 @@ func newEventBuffer(size int64, maxItemTTL time.Duration, onEvict EvictCallbackF // watchers. After calling append, the caller must not make any further // mutations to the events as they may have been exposed to subscribers in other // goroutines. Append only supports a single concurrent caller and must be -// externally synchronized with other Append, or prune calls. +// externally synchronized with other Append calls. func (b *eventBuffer) Append(events *structs.Events) { b.appendItem(newBufferItem(events)) } @@ -104,7 +102,7 @@ func (b *eventBuffer) appendItem(item *bufferItem) { } func newSentinelItem() *bufferItem { - return newBufferItem(&structs.Events{Index: 0, Events: nil}) + return newBufferItem(&structs.Events{}) } // advanceHead drops the current Head buffer item and notifies readers @@ -191,25 +189,6 @@ func (b *eventBuffer) Len() int { return int(atomic.LoadInt64(b.size)) } -// prune advances the head of the buffer until the head buffer item TTL -// is no longer expired. It should be externally synchronized as it mutates -// the buffer of items. -func (b *eventBuffer) prune() { - now := time.Now() - for { - head := b.Head() - if b.Len() == 0 { - return - } - - if now.Sub(head.createdAt) > b.maxItemTTL { - b.advanceHead() - } else { - return - } - } -} - // bufferItem represents a set of events published by a single raft operation. // The first item returned by a newly constructed buffer will have nil Events. // It is a sentinel value which is used to wait on the next events via Next. diff --git a/nomad/stream/event_buffer_test.go b/nomad/stream/event_buffer_test.go index 84f0b852473..45501d12372 100644 --- a/nomad/stream/event_buffer_test.go +++ b/nomad/stream/event_buffer_test.go @@ -17,7 +17,7 @@ func TestEventBufferFuzz(t *testing.T) { nReaders := 1000 nMessages := 1000 - b := newEventBuffer(1000, DefaultTTL, nil) + b := newEventBuffer(1000, nil) // Start a write goroutine that will publish 10000 messages with sequential // indexes and some jitter in timing (to allow clients to "catch up" and block @@ -85,7 +85,7 @@ func TestEventBufferFuzz(t *testing.T) { } func TestEventBuffer_Slow_Reader(t *testing.T) { - b := newEventBuffer(10, DefaultTTL, nil) + b := newEventBuffer(10, nil) for i := 0; i < 10; i++ { e := structs.Event{ @@ -114,7 +114,7 @@ func TestEventBuffer_Slow_Reader(t *testing.T) { } func TestEventBuffer_Size(t *testing.T) { - b := newEventBuffer(100, DefaultTTL, nil) + b := newEventBuffer(100, nil) for i := 0; i < 10; i++ { e := structs.Event{ @@ -126,11 +126,11 @@ func TestEventBuffer_Size(t *testing.T) { require.Equal(t, 10, b.Len()) } -// TestEventBuffer_Prune_AllOld tests the behavior when all items -// are past their TTL, the event buffer should prune down to the last message -// and hold onto the last item. -func TestEventBuffer_Prune_AllOld(t *testing.T) { - b := newEventBuffer(100, 1*time.Second, nil) +// TestEventBuffer_Emptying_Buffer tests the behavior when all items +// are removed, the event buffer should advance its head down to the last message +// and insert a placeholder sentinel value. +func TestEventBuffer_Emptying_Buffer(t *testing.T) { + b := newEventBuffer(10, nil) for i := 0; i < 10; i++ { e := structs.Event{ @@ -141,11 +141,11 @@ func TestEventBuffer_Prune_AllOld(t *testing.T) { require.Equal(t, 10, int(b.Len())) - time.Sleep(1 * time.Second) - - // prune old messages, which will bring the event buffer down + // empty the buffer, which will bring the event buffer down // to a single sentinel value - b.prune() + for i := 0; i < 11; i++ { + b.advanceHead() + } // head and tail are now a sentinel value head := b.Head() @@ -203,7 +203,7 @@ func TestEventBuffer_StartAt_CurrentIdx_Past_Start(t *testing.T) { } // buffer starts at index 11 goes to 100 - b := newEventBuffer(100, 1*time.Hour, nil) + b := newEventBuffer(100, nil) for i := 11; i <= 100; i++ { e := structs.Event{ @@ -226,7 +226,7 @@ func TestEventBuffer_OnEvict(t *testing.T) { testOnEvict := func(events *structs.Events) { close(called) } - b := newEventBuffer(2, DefaultTTL, testOnEvict) + b := newEventBuffer(2, testOnEvict) // start at 1 since new event buffer is built with a starting sentinel value for i := 1; i < 4; i++ { diff --git a/nomad/stream/event_publisher.go b/nomad/stream/event_publisher.go index f28131dddec..bcf8a8fc52b 100644 --- a/nomad/stream/event_publisher.go +++ b/nomad/stream/event_publisher.go @@ -2,9 +2,11 @@ package stream import ( "context" + "fmt" "sync" "time" + "github.com/armon/go-metrics" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/go-hclog" @@ -14,48 +16,34 @@ const ( DefaultTTL = 1 * time.Hour ) -type EventPublisherCfg struct { +type EventBrokerCfg struct { EventBufferSize int64 - EventBufferTTL time.Duration Logger hclog.Logger OnEvict EvictCallbackFn } -type EventPublisher struct { - // lock protects the eventbuffer - lock sync.Mutex +type EventBroker struct { + // mu protects the eventbuffer and subscriptions + mu sync.Mutex // eventBuf stores a configurable amount of events in memory eventBuf *eventBuffer - logger hclog.Logger - subscriptions *subscriptions // publishCh is used to send messages from an active txn to a goroutine which // publishes events, so that publishing can happen asynchronously from // the Commit call in the FSM hot path. publishCh chan *structs.Events -} - -type subscriptions struct { - // lock for byToken. If both subscription.lock and EventPublisher.lock need - // to be held, EventPublisher.lock MUST always be acquired first. - lock sync.RWMutex - // byToken is an mapping of active Subscriptions indexed by a token and - // a pointer to the request. - // When the token is modified all subscriptions under that token will be - // reloaded. - // A subscription may be unsubscribed by using the pointer to the request. - byToken map[string]map[*SubscribeRequest]*Subscription + logger hclog.Logger } -func NewEventPublisher(ctx context.Context, cfg EventPublisherCfg) *EventPublisher { - if cfg.EventBufferTTL == 0 { - cfg.EventBufferTTL = 1 * time.Hour - } - +// NewEventBroker returns an EventBroker for publishing change events. +// A goroutine is run in the background to publish events to an event buffer. +// Cancelling the context will shutdown the goroutine to free resources, and stop +// all publishing. +func NewEventBroker(ctx context.Context, cfg EventBrokerCfg) *EventBroker { if cfg.Logger == nil { cfg.Logger = hclog.NewNullLogger() } @@ -65,9 +53,9 @@ func NewEventPublisher(ctx context.Context, cfg EventPublisherCfg) *EventPublish cfg.EventBufferSize = 100 } - buffer := newEventBuffer(cfg.EventBufferSize, cfg.EventBufferTTL, cfg.OnEvict) - e := &EventPublisher{ - logger: cfg.Logger.Named("event_publisher"), + buffer := newEventBuffer(cfg.EventBufferSize, cfg.OnEvict) + e := &EventBroker{ + logger: cfg.Logger.Named("event_broker"), eventBuf: buffer, publishCh: make(chan *structs.Events, 64), subscriptions: &subscriptions{ @@ -80,21 +68,35 @@ func NewEventPublisher(ctx context.Context, cfg EventPublisherCfg) *EventPublish return e } -func (e *EventPublisher) Len() int { +// Returns the current length of the event buffer +func (e *EventBroker) Len() int { + e.mu.Lock() + defer e.mu.Unlock() return e.eventBuf.Len() } // Publish events to all subscribers of the event Topic. -func (e *EventPublisher) Publish(events *structs.Events) { +func (e *EventBroker) Publish(events *structs.Events) { if len(events.Events) > 0 { e.publishCh <- events } } -// Subscribe returns a new Subscription for a given request. -func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error) { - e.lock.Lock() - defer e.lock.Unlock() +// Subscribe returns a new Subscription for a given request. A Subscription +// will receive an initial empty currentItem value which points to the first item +// in the buffer. This allows the new subscription to call Next() without first checking +// for the current Item. +// +// A Subscription will start at the requested index, or as close as possible to +// the requested index if it is no longer in the buffer. If StartExactlyAtIndex is +// set and the index is no longer in the buffer or not yet in the buffer an error +// will be returned. +// +// When a caller is finished with the subscription it must call Subscription.Unsubscribe +// to free ACL tracking resources. TODO(drew) ACL tracking +func (e *EventBroker) Subscribe(req *SubscribeRequest) (*Subscription, error) { + e.mu.Lock() + defer e.mu.Unlock() var head *bufferItem var offset int @@ -103,8 +105,11 @@ func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error) } else { head = e.eventBuf.Head() } - if offset > 0 { - e.logger.Warn("requested index no longer in buffer", "requsted", int(req.Index), "closest", int(head.Events.Index)) + if offset > 0 && req.StartExactlyAtIndex { + return nil, fmt.Errorf("requested index not in buffer") + } else if offset > 0 { + metrics.SetGauge([]string{"nomad", "event_broker", "subscription", "request_offset"}, float32(offset)) + e.logger.Debug("requested index no longer in buffer", "requsted", int(req.Index), "closest", int(head.Events.Index)) } // Empty head so that calling Next on sub @@ -112,17 +117,18 @@ func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error) start.link.next.Store(head) close(start.link.ch) - sub := newSubscription(req, start, e.subscriptions.unsubscribe(req)) + sub := newSubscription(req, start, e.subscriptions.unsubscribeFn(req)) e.subscriptions.add(req, sub) return sub, nil } -func (e *EventPublisher) CloseAll() { +// CloseAll closes all subscriptions +func (e *EventBroker) CloseAll() { e.subscriptions.closeAll() } -func (e *EventPublisher) handleUpdates(ctx context.Context) { +func (e *EventBroker) handleUpdates(ctx context.Context) { for { select { case <-ctx.Done(): @@ -135,16 +141,29 @@ func (e *EventPublisher) handleUpdates(ctx context.Context) { } // sendEvents sends the given events to the publishers event buffer. -func (e *EventPublisher) sendEvents(update *structs.Events) { - e.lock.Lock() - defer e.lock.Unlock() +func (e *EventBroker) sendEvents(update *structs.Events) { + e.mu.Lock() + defer e.mu.Unlock() e.eventBuf.Append(update) } +type subscriptions struct { + // mu for byToken. If both subscription.mu and EventBroker.mu need + // to be held, EventBroker mutex MUST always be acquired first. + mu sync.RWMutex + + // byToken is an mapping of active Subscriptions indexed by a token and + // a pointer to the request. + // When the token is modified all subscriptions under that token will be + // reloaded. + // A subscription may be unsubscribed by using the pointer to the request. + byToken map[string]map[*SubscribeRequest]*Subscription +} + func (s *subscriptions) add(req *SubscribeRequest, sub *Subscription) { - s.lock.Lock() - defer s.lock.Unlock() + s.mu.Lock() + defer s.mu.Unlock() subsByToken, ok := s.byToken[req.Token] if !ok { @@ -155,8 +174,8 @@ func (s *subscriptions) add(req *SubscribeRequest, sub *Subscription) { } func (s *subscriptions) closeSubscriptionsForTokens(tokenSecretIDs []string) { - s.lock.RLock() - defer s.lock.RUnlock() + s.mu.RLock() + defer s.mu.RUnlock() for _, secretID := range tokenSecretIDs { if subs, ok := s.byToken[secretID]; ok { @@ -167,20 +186,29 @@ func (s *subscriptions) closeSubscriptionsForTokens(tokenSecretIDs []string) { } } -// unsubscribe returns a function that the subscription will call to remove +// unsubscribeFn returns a function that the subscription will call to remove // itself from the subsByToken. // This function is returned as a closure so that the caller doesn't need to keep -// track of the SubscriptionRequest, and can not accidentally call unsubscribe with the +// track of the SubscriptionRequest, and can not accidentally call unsubscribeFn with the // wrong pointer. -func (s *subscriptions) unsubscribe(req *SubscribeRequest) func() { +func (s *subscriptions) unsubscribeFn(req *SubscribeRequest) func() { return func() { - s.lock.Lock() - defer s.lock.Unlock() + s.mu.Lock() + defer s.mu.Unlock() subsByToken, ok := s.byToken[req.Token] if !ok { return } + + sub := subsByToken[req] + if sub == nil { + return + } + + // close the subscription + sub.forceClose() + delete(subsByToken, req) if len(subsByToken) == 0 { delete(s.byToken, req.Token) @@ -189,8 +217,8 @@ func (s *subscriptions) unsubscribe(req *SubscribeRequest) func() { } func (s *subscriptions) closeAll() { - s.lock.Lock() - defer s.lock.Unlock() + s.mu.Lock() + defer s.mu.Unlock() for _, byRequest := range s.byToken { for _, sub := range byRequest { diff --git a/nomad/stream/event_publisher_test.go b/nomad/stream/event_publisher_test.go index af92d059566..0587a81f520 100644 --- a/nomad/stream/event_publisher_test.go +++ b/nomad/stream/event_publisher_test.go @@ -2,6 +2,7 @@ package stream import ( "context" + "sync/atomic" "testing" "time" @@ -10,16 +11,16 @@ import ( "github.com/stretchr/testify/require" ) -func TestEventPublisher_PublishChangesAndSubscribe(t *testing.T) { +func TestEventBroker_PublishChangesAndSubscribe(t *testing.T) { subscription := &SubscribeRequest{ Topics: map[structs.Topic][]string{ - "Test": []string{"sub-key"}, + "Test": {"sub-key"}, }, } ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - publisher := NewEventPublisher(ctx, EventPublisherCfg{EventBufferSize: 100, EventBufferTTL: DefaultTTL}) + publisher := NewEventBroker(ctx, EventBrokerCfg{EventBufferSize: 100}) sub, err := publisher.Subscribe(subscription) require.NoError(t, err) eventCh := consumeSubscription(ctx, sub) @@ -59,11 +60,11 @@ func TestEventPublisher_PublishChangesAndSubscribe(t *testing.T) { require.Equal(t, expected, result.Events) } -func TestEventPublisher_ShutdownClosesSubscriptions(t *testing.T) { +func TestEventBroker_ShutdownClosesSubscriptions(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - publisher := NewEventPublisher(ctx, EventPublisherCfg{}) + publisher := NewEventBroker(ctx, EventBrokerCfg{}) sub1, err := publisher.Subscribe(&SubscribeRequest{}) require.NoError(t, err) @@ -82,6 +83,32 @@ func TestEventPublisher_ShutdownClosesSubscriptions(t *testing.T) { require.Equal(t, err, ErrSubscriptionClosed) } +// TestEventBroker_EmptyReqToken_DistinctSubscriptions tests subscription +// hanlding behavior when ACLs are disabled (request Token is empty). +// Subscriptions are mapped by their request token. when that token is empty, +// the subscriptions should still be handled indeppendtly of each other when +// unssubscribing. +func TestEventBroker_EmptyReqToken_DistinctSubscriptions(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + publisher := NewEventBroker(ctx, EventBrokerCfg{}) + + // first subscription, empty token + sub1, err := publisher.Subscribe(&SubscribeRequest{}) + require.NoError(t, err) + defer sub1.Unsubscribe() + + // second subscription, empty token + sub2, err := publisher.Subscribe(&SubscribeRequest{}) + require.NoError(t, err) + require.NotNil(t, sub2) + + sub1.Unsubscribe() + + require.Equal(t, subscriptionStateOpen, atomic.LoadUint32(&sub2.state)) +} + func consumeSubscription(ctx context.Context, sub *Subscription) <-chan subNextResult { eventCh := make(chan subNextResult, 1) go func() { diff --git a/nomad/stream/ndjson.go b/nomad/stream/ndjson.go index cd3befc7ea1..7e7ad092810 100644 --- a/nomad/stream/ndjson.go +++ b/nomad/stream/ndjson.go @@ -1,81 +1,63 @@ package stream import ( - "bytes" "context" "encoding/json" "fmt" - "sync" "time" "github.com/hashicorp/nomad/nomad/structs" ) var ( - // NDJsonHeartbeat is the NDJson to send as a heartbeat + // JsonHeartbeat is an empty JSON object to send as a heartbeat // Avoids creating many heartbeat instances - NDJsonHeartbeat = &structs.NDJson{Data: []byte("{}\n")} + JsonHeartbeat = &structs.EventJson{Data: []byte("{}")} ) -// NDJsonStream is used to send new line delimited JSON and heartbeats +// JsonStream is used to send new line delimited JSON and heartbeats // to a destination (out channel) -type NDJsonStream struct { - out chan<- *structs.NDJson +type JsonStream struct { + // ctx is a passed in context used to notify the json stream + // when it should terminate + ctx context.Context + + outCh chan *structs.EventJson // heartbeat is the interval to send heartbeat messages to keep a connection // open. - heartbeat *time.Ticker - - publishCh chan structs.NDJson - exitCh chan struct{} - - l sync.Mutex - running bool + heartbeatTick *time.Ticker } -// NewNNewNDJsonStream creates a new NDJson stream that will output NDJson structs -// to the passed output channel -func NewNDJsonStream(out chan<- *structs.NDJson, heartbeat time.Duration) *NDJsonStream { - return &NDJsonStream{ - out: out, - heartbeat: time.NewTicker(heartbeat), - exitCh: make(chan struct{}), - publishCh: make(chan structs.NDJson), +// NewJsonStream creates a new json stream that will output Json structs +// to the passed output channel. The constructor starts a goroutine +// to begin heartbeating on its set interval. +func NewJsonStream(ctx context.Context, heartbeat time.Duration) *JsonStream { + s := &JsonStream{ + ctx: ctx, + outCh: make(chan *structs.EventJson, 10), + heartbeatTick: time.NewTicker(heartbeat), } -} -// Run starts a long lived goroutine that handles sending -// heartbeats and processed json objects to the streams out channel as well -func (n *NDJsonStream) Run(ctx context.Context) { - n.l.Lock() - if n.running { - return - } - n.running = true - n.l.Unlock() + go s.heartbeat() - go n.run(ctx) + return s } -func (n *NDJsonStream) run(ctx context.Context) { - defer func() { - n.l.Lock() - n.running = false - n.l.Unlock() - close(n.exitCh) - }() +func (n *JsonStream) OutCh() chan *structs.EventJson { + return n.outCh +} +func (n *JsonStream) heartbeat() { for { select { - case <-ctx.Done(): + case <-n.ctx.Done(): return - case msg := <-n.publishCh: - n.out <- msg.Copy() - case <-n.heartbeat.C: + case <-n.heartbeatTick.C: // Send a heartbeat frame select { - case n.out <- NDJsonHeartbeat: - case <-ctx.Done(): + case n.outCh <- JsonHeartbeat: + case <-n.ctx.Done(): return } } @@ -84,19 +66,20 @@ func (n *NDJsonStream) run(ctx context.Context) { // Send encodes an object into Newline delimited json. An error is returned // if json encoding fails or if the stream is no longer running. -func (n *NDJsonStream) Send(obj interface{}) error { - n.l.Lock() - defer n.l.Unlock() +func (n *JsonStream) Send(v interface{}) error { + if n.ctx.Err() != nil { + return n.ctx.Err() + } - buf := bytes.NewBuffer(nil) - if err := json.NewEncoder(buf).Encode(obj); err != nil { - return fmt.Errorf("marshaling json for stream: %w", err) + buf, err := json.Marshal(v) + if err != nil { + return fmt.Errorf("error marshaling json for stream: %w", err) } select { - case n.publishCh <- structs.NDJson{Data: buf.Bytes()}: - case <-n.exitCh: - return fmt.Errorf("stream is no longer running") + case <-n.ctx.Done(): + return fmt.Errorf("error stream is no longer running: %w", err) + case n.outCh <- &structs.EventJson{Data: buf}: } return nil diff --git a/nomad/stream/ndjson_test.go b/nomad/stream/ndjson_test.go index 589cde1a719..0c7c4de787d 100644 --- a/nomad/stream/ndjson_test.go +++ b/nomad/stream/ndjson_test.go @@ -6,7 +6,6 @@ import ( "testing" "time" - "github.com/hashicorp/nomad/nomad/structs" "github.com/stretchr/testify/require" ) @@ -14,15 +13,14 @@ type testObj struct { Name string `json:"name"` } -func TestNDJson(t *testing.T) { +func TestJsonStream(t *testing.T) { t.Parallel() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - out := make(chan *structs.NDJson) - s := NewNDJsonStream(out, 1*time.Second) - s.Run(ctx) + s := NewJsonStream(ctx, 1*time.Second) + out := s.OutCh() require.NoError(t, s.Send(testObj{Name: "test"})) @@ -30,25 +28,31 @@ func TestNDJson(t *testing.T) { var expected bytes.Buffer expected.Write([]byte(`{"name":"test"}`)) - expected.Write([]byte("\n")) require.Equal(t, expected.Bytes(), out1.Data) select { - case _ = <-out: - t.Fatalf("Did not expect another message") + case msg := <-out: + require.Failf(t, "Did not expect another message", "%#v", msg) case <-time.After(100 * time.Millisecond): } + + require.NoError(t, s.Send(testObj{Name: "test2"})) + + out2 := <-out + expected.Reset() + + expected.Write([]byte(`{"name":"test2"}`)) + require.Equal(t, expected.Bytes(), out2.Data) + } -func TestNDJson_Send_After_Stop(t *testing.T) { +func TestJson_Send_After_Stop(t *testing.T) { t.Parallel() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - out := make(chan *structs.NDJson) - s := NewNDJsonStream(out, 1*time.Second) - s.Run(ctx) + s := NewJsonStream(ctx, 1*time.Second) // stop the stream cancel() @@ -57,17 +61,16 @@ func TestNDJson_Send_After_Stop(t *testing.T) { require.Error(t, s.Send(testObj{})) } -func TestNDJson_HeartBeat(t *testing.T) { +func TestJson_HeartBeat(t *testing.T) { t.Parallel() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - out := make(chan *structs.NDJson) - s := NewNDJsonStream(out, 10*time.Millisecond) - s.Run(ctx) + s := NewJsonStream(ctx, 10*time.Millisecond) + out := s.OutCh() heartbeat := <-out - require.Equal(t, NDJsonHeartbeat, heartbeat) + require.Equal(t, JsonHeartbeat, heartbeat) } diff --git a/nomad/stream/subscription.go b/nomad/stream/subscription.go index b85838f3d9c..70b133a6382 100644 --- a/nomad/stream/subscription.go +++ b/nomad/stream/subscription.go @@ -9,7 +9,6 @@ import ( ) const ( - AllKeys = "*" // subscriptionStateOpen is the default state of a subscription. An open // subscription may receive new events. subscriptionStateOpen uint32 = 0 @@ -25,7 +24,7 @@ const ( var ErrSubscriptionClosed = errors.New("subscription closed by server, client should resubscribe") type Subscription struct { - // state is accessed atomically 0 means open, 1 means closed with reload + // state must be accessed atomically 0 means open, 1 means closed with reload state uint32 req *SubscribeRequest @@ -35,10 +34,10 @@ type Subscription struct { currentItem *bufferItem // forceClosed is closed when forceClose is called. It is used by - // EventPublisher to cancel Next(). + // EventBroker to cancel Next(). forceClosed chan struct{} - // unsub is a function set by EventPublisher that is called to free resources + // unsub is a function set by EventBroker that is called to free resources // when the subscription is no longer needed. // It must be safe to call the function from multiple goroutines and the function // must be idempotent. @@ -51,6 +50,12 @@ type SubscribeRequest struct { Namespace string Topics map[structs.Topic][]string + + // StartExactlyAtIndex specifies if a subscription needs to + // start exactly at the requested Index. If set to false, + // the closest index in the buffer will be returned if there is not + // an exact match + StartExactlyAtIndex bool } func newSubscription(req *SubscribeRequest, item *bufferItem, unsub func()) *Subscription { @@ -124,11 +129,11 @@ func filter(req *SubscribeRequest, events []structs.Event) []structs.Event { var count int for _, e := range events { - _, allTopics := req.Topics[AllKeys] + _, allTopics := req.Topics[structs.TopicAll] if _, ok := req.Topics[e.Topic]; ok || allTopics { var keys []string if allTopics { - keys = req.Topics[AllKeys] + keys = req.Topics[structs.TopicAll] } else { keys = req.Topics[e.Topic] } @@ -136,9 +141,7 @@ func filter(req *SubscribeRequest, events []structs.Event) []structs.Event { continue } for _, k := range keys { - // if req.Namespace != "" && e.Namespace != "" && e.Namespace == - // if e.Namespace != "" && e.Namespace - if e.Key == k || k == AllKeys { + if e.Key == k || k == string(structs.TopicAll) || filterKeyContains(e.FilterKeys, k) { count++ } } @@ -156,11 +159,11 @@ func filter(req *SubscribeRequest, events []structs.Event) []structs.Event { // Return filtered events result := make([]structs.Event, 0, count) for _, e := range events { - _, allTopics := req.Topics[AllKeys] + _, allTopics := req.Topics[structs.TopicAll] if _, ok := req.Topics[e.Topic]; ok || allTopics { var keys []string if allTopics { - keys = req.Topics[AllKeys] + keys = req.Topics[structs.TopicAll] } else { keys = req.Topics[e.Topic] } @@ -169,7 +172,7 @@ func filter(req *SubscribeRequest, events []structs.Event) []structs.Event { continue } for _, k := range keys { - if e.Key == k || k == AllKeys { + if e.Key == k || k == string(structs.TopicAll) || filterKeyContains(e.FilterKeys, k) { result = append(result, e) } } @@ -177,3 +180,12 @@ func filter(req *SubscribeRequest, events []structs.Event) []structs.Event { } return result } + +func filterKeyContains(filterKeys []string, key string) bool { + for _, fk := range filterKeys { + if fk == key { + return true + } + } + return false +} diff --git a/nomad/stream/subscription_test.go b/nomad/stream/subscription_test.go index a83be880986..c54bc4fa557 100644 --- a/nomad/stream/subscription_test.go +++ b/nomad/stream/subscription_test.go @@ -8,10 +8,6 @@ import ( "github.com/stretchr/testify/require" ) -func TestSubscription(t *testing.T) { - -} - func TestFilter_AllTopics(t *testing.T) { events := make([]structs.Event, 0, 5) events = append(events, structs.Event{Topic: "Test", Key: "One"}, structs.Event{Topic: "Test", Key: "Two"}) @@ -112,3 +108,22 @@ func TestFilter_Namespace(t *testing.T) { require.Equal(t, cap(actual), 2) } + +func TestFilter_FilterKeys(t *testing.T) { + events := make([]structs.Event, 0, 5) + events = append(events, structs.Event{Topic: "Test", Key: "One", FilterKeys: []string{"extra-key"}}, structs.Event{Topic: "Test", Key: "Two"}, structs.Event{Topic: "Test", Key: "Two"}) + + req := &SubscribeRequest{ + Topics: map[structs.Topic][]string{ + "Test": {"extra-key"}, + }, + Namespace: "foo", + } + actual := filter(req, events) + expected := []structs.Event{ + {Topic: "Test", Key: "One", FilterKeys: []string{"extra-key"}}, + } + require.Equal(t, expected, actual) + + require.Equal(t, cap(actual), 1) +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 75db4c35cab..143670eee16 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -10708,8 +10708,7 @@ type ACLTokenUpsertResponse struct { WriteMeta } -// EEventStreamRequest is used to stream events from a servers -// EventPublisher +// EventStreamRequest is used to stream events from a servers EventBroker type EventStreamRequest struct { Topics map[Topic][]string Index int @@ -10719,7 +10718,7 @@ type EventStreamRequest struct { type EventStreamWrapper struct { Error *RpcError - Event *NDJson + Event *EventJson } // RpcError is used for serializing errors with a potential error code @@ -10741,28 +10740,54 @@ func (r *RpcError) Error() string { type Topic string +const ( + TopicDeployment Topic = "Deployment" + TopicEval Topic = "Eval" + TopicAlloc Topic = "Alloc" + TopicJob Topic = "Job" + TopicNode Topic = "Node" + TopicAll Topic = "*" +) + +// Event represents a change in Nomads state. type Event struct { - Topic Topic - Type string - Key string - Namespace string + // Topic represeents the primary object for the event + Topic Topic + + // Type is a short string representing the reason for the event + Type string + + // Key is the primary identifier of the Event, The involved objects ID + Key string + + // Namespace is the namespace of the object, If the object is not namespace + // aware (Node) it is left blank + Namespace string + + // FilterKeys are a set of additional related keys that are used to include + // events during filtering. FilterKeys []string - Index uint64 - Payload interface{} + + // Index is the raft index that corresponds to the event + Index uint64 + + // Payload is the Event itself see state/events.go for a list of events + Payload interface{} } +// Events is a wrapper that contains a set of events for a given index. type Events struct { Index uint64 Events []Event } -// NNDJson is a wrapper for a Newline Delimited JSON object -type NDJson struct { +// EventJson is a wrapper for a JSON object +type EventJson struct { Data []byte } -func (j *NDJson) Copy() *NDJson { - n := new(NDJson) +func (j *EventJson) Copy() *EventJson { + n := new(EventJson) *n = *j n.Data = make([]byte, len(j.Data)) copy(n.Data, j.Data) diff --git a/nomad/testing.go b/nomad/testing.go index edb2d1c8e49..3b9d4bf4b8b 100644 --- a/nomad/testing.go +++ b/nomad/testing.go @@ -47,7 +47,7 @@ func TestServer(t testing.T, cb func(*Config)) (*Server, func()) { config.Logger = testlog.HCLogger(t) config.Build = version.Version + "+unittest" config.DevMode = true - config.EnableEventPublisher = true + config.EnableEventBroker = true config.BootstrapExpect = 1 nodeNum := atomic.AddUint32(&nodeNumber, 1) config.NodeName = fmt.Sprintf("nomad-%03d", nodeNum) diff --git a/vendor/github.com/hashicorp/nomad/api/event.go b/vendor/github.com/hashicorp/nomad/api/event.go index dce3c265fe8..f89e222848f 100644 --- a/vendor/github.com/hashicorp/nomad/api/event.go +++ b/vendor/github.com/hashicorp/nomad/api/event.go @@ -6,7 +6,7 @@ import ( "fmt" ) -// Ebvents is a set of events for a corresponding index. Events returned for the +// Events is a set of events for a corresponding index. Events returned for the // index depend on which topics are subscribed to when a request is made. type Events struct { Index uint64 @@ -47,11 +47,11 @@ func (c *Client) EventStream() *EventStream { // Stream establishes a new subscription to Nomad's event stream and streams // results back to the returned channel. func (e *EventStream) Stream(ctx context.Context, topics map[Topic][]string, index uint64, q *QueryOptions) (<-chan *Events, error) { - r, err := e.client.newRequest("GET", "/v1/event/stream") if err != nil { return nil, err } + q = q.WithContext(ctx) r.setQueryOptions(q) // Build topic query params @@ -78,11 +78,11 @@ func (e *EventStream) Stream(ctx context.Context, topics map[Topic][]string, ind // Decode next newline delimited json of events var events Events if err := dec.Decode(&events); err != nil { + // set error and fallthrough to + // select eventsCh events = Events{Err: err} - eventsCh <- &events - return } - if events.IsHeartbeat() { + if events.Err == nil && events.IsHeartbeat() { continue }