diff --git a/api/event.go b/api/event.go index 2bf08fba579..f89e222848f 100644 --- a/api/event.go +++ b/api/event.go @@ -47,11 +47,11 @@ func (c *Client) EventStream() *EventStream { // Stream establishes a new subscription to Nomad's event stream and streams // results back to the returned channel. func (e *EventStream) Stream(ctx context.Context, topics map[Topic][]string, index uint64, q *QueryOptions) (<-chan *Events, error) { - r, err := e.client.newRequest("GET", "/v1/event/stream") if err != nil { return nil, err } + q = q.WithContext(ctx) r.setQueryOptions(q) // Build topic query params @@ -82,7 +82,7 @@ func (e *EventStream) Stream(ctx context.Context, topics map[Topic][]string, ind // select eventsCh events = Events{Err: err} } - if events.IsHeartbeat() { + if events.Err == nil && events.IsHeartbeat() { continue } diff --git a/api/event_test.go b/api/event_test.go index 9bd4ae3a4dc..80820e97a80 100644 --- a/api/event_test.go +++ b/api/event_test.go @@ -34,20 +34,15 @@ func TestEvent_Stream(t *testing.T) { streamCh, err := events.Stream(ctx, topics, 0, q) require.NoError(t, err) -OUTER: - for { - select { - case event := <-streamCh: - if event.Err != nil { - require.Fail(t, err.Error()) - } - require.Equal(t, len(event.Events), 1) - require.Equal(t, "Eval", string(event.Events[0].Topic)) - - break OUTER - case <-time.After(5 * time.Second): - require.Fail(t, "failed waiting for event stream event") + select { + case event := <-streamCh: + if event.Err != nil { + require.Fail(t, err.Error()) } + require.Equal(t, len(event.Events), 1) + require.Equal(t, "Eval", string(event.Events[0].Topic)) + case <-time.After(5 * time.Second): + require.Fail(t, "failed waiting for event stream event") } } @@ -76,4 +71,43 @@ func TestEvent_Stream_Err_InvalidQueryParam(t *testing.T) { _, err = events.Stream(ctx, topics, 0, q) require.Error(t, err) + require.Contains(t, err.Error(), "400") + require.Contains(t, err.Error(), "Invalid key value pair") +} + +func TestEvent_Stream_CloseCtx(t *testing.T) { + t.Parallel() + + c, s := makeClient(t, nil, nil) + defer s.Stop() + + // register job to generate events + jobs := c.Jobs() + job := testJob() + resp2, _, err := jobs.Register(job, nil) + require.Nil(t, err) + require.NotNil(t, resp2) + + // build event stream request + events := c.EventStream() + q := &QueryOptions{} + topics := map[Topic][]string{ + "Eval": {"*"}, + } + + ctx, cancel := context.WithCancel(context.Background()) + + streamCh, err := events.Stream(ctx, topics, 0, q) + require.NoError(t, err) + + // cancel the request + cancel() + + select { + case event, ok := <-streamCh: + require.False(t, ok) + require.Nil(t, event) + case <-time.After(5 * time.Second): + require.Fail(t, "failed waiting for event stream event") + } } diff --git a/command/agent/agent.go b/command/agent/agent.go index 54de24470f6..e9d9f882bd5 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -243,11 +243,14 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) { if agentConfig.Server.UpgradeVersion != "" { conf.UpgradeVersion = agentConfig.Server.UpgradeVersion } - if agentConfig.Server.EnableEventPublisher != nil { - conf.EnableEventPublisher = *agentConfig.Server.EnableEventPublisher + if agentConfig.Server.EnableEventBroker != nil { + conf.EnableEventBroker = *agentConfig.Server.EnableEventBroker } - if agentConfig.Server.EventBufferSize > 0 { - conf.EventBufferSize = int64(agentConfig.Server.EventBufferSize) + if agentConfig.Server.EventBufferSize != nil { + conf.EventBufferSize = int64(*agentConfig.Server.EventBufferSize) + } + if agentConfig.Server.DurableEventCount != nil { + conf.DurableEventCount = int64(*agentConfig.Server.DurableEventCount) } if agentConfig.Autopilot != nil { if agentConfig.Autopilot.CleanupDeadServers != nil { diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index 1ac90044465..5c71a70b3c6 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -57,7 +57,8 @@ func TestAgent_ServerConfig(t *testing.T) { out, err := a.serverConfig() require.NoError(t, err) - require.True(t, out.EnableEventPublisher) + require.True(t, out.EnableEventBroker) + require.Equal(t, int64(100), out.DurableEventCount) serfAddr := out.SerfConfig.MemberlistConfig.AdvertiseAddr require.Equal(t, "127.0.0.1", serfAddr) diff --git a/command/agent/config.go b/command/agent/config.go index 9665fcaa9ad..f9690efffab 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -484,18 +484,18 @@ type ServerConfig struct { // This value is ignored. DefaultSchedulerConfig *structs.SchedulerConfiguration `hcl:"default_scheduler_config"` - // EnableEventPublisher configures whether this server's state store + // EnableEventBroker configures whether this server's state store // will generate events for its event stream. - EnableEventPublisher *bool `hcl:"enable_event_publisher"` + EnableEventBroker *bool `hcl:"enable_event_broker"` // EventBufferSize configure the amount of events to be held in memory. - // If EnableEventPublisher is set to true, the minimum allowable value + // If EnableEventBroker is set to true, the minimum allowable value // for the EventBufferSize is 1. - EventBufferSize int `hcl:"event_buffer_size"` + EventBufferSize *int `hcl:"event_buffer_size"` // DurableEventCount specifies the amount of events to persist during snapshot generation. // A count of 0 signals that no events should be persisted. - DurableEventCount int `hcl:"durable_event_count"` + DurableEventCount *int `hcl:"durable_event_count"` // ExtraKeysHCL is used by hcl to surface unexpected keys ExtraKeysHCL []string `hcl:",unusedKeys" json:"-"` @@ -887,11 +887,11 @@ func DefaultConfig() *Config { BindWildcardDefaultHostNetwork: true, }, Server: &ServerConfig{ - Enabled: false, - EnableEventPublisher: helper.BoolToPtr(true), - EventBufferSize: 100, - DurableEventCount: 100, - StartJoin: []string{}, + Enabled: false, + EnableEventBroker: helper.BoolToPtr(true), + EventBufferSize: helper.IntToPtr(100), + DurableEventCount: helper.IntToPtr(100), + StartJoin: []string{}, ServerJoin: &ServerJoin{ RetryJoin: []string{}, RetryInterval: 30 * time.Second, @@ -1415,15 +1415,15 @@ func (a *ServerConfig) Merge(b *ServerConfig) *ServerConfig { result.ServerJoin = result.ServerJoin.Merge(b.ServerJoin) } - if b.EnableEventPublisher != nil { - result.EnableEventPublisher = b.EnableEventPublisher + if b.EnableEventBroker != nil { + result.EnableEventBroker = b.EnableEventBroker } - if b.EventBufferSize != 0 { + if b.EventBufferSize != nil { result.EventBufferSize = b.EventBufferSize } - if b.DurableEventCount != 0 { + if b.DurableEventCount != nil { result.DurableEventCount = b.DurableEventCount } diff --git a/command/agent/config_parse_test.go b/command/agent/config_parse_test.go index 8999fa28120..771ff19731f 100644 --- a/command/agent/config_parse_test.go +++ b/command/agent/config_parse_test.go @@ -122,9 +122,9 @@ var basicConfig = &Config{ RedundancyZone: "foo", UpgradeVersion: "0.8.0", EncryptKey: "abc", - EnableEventPublisher: helper.BoolToPtr(false), - EventBufferSize: 200, - DurableEventCount: 100, + EnableEventBroker: helper.BoolToPtr(false), + EventBufferSize: helper.IntToPtr(200), + DurableEventCount: helper.IntToPtr(0), ServerJoin: &ServerJoin{ RetryJoin: []string{"1.1.1.1", "2.2.2.2"}, RetryInterval: time.Duration(15) * time.Second, diff --git a/command/agent/config_test.go b/command/agent/config_test.go index 59aa42e0572..4e0aa934609 100644 --- a/command/agent/config_test.go +++ b/command/agent/config_test.go @@ -138,7 +138,9 @@ func TestConfig_Merge(t *testing.T) { MaxHeartbeatsPerSecond: 30.0, RedundancyZone: "foo", UpgradeVersion: "foo", - EnableEventPublisher: helper.BoolToPtr(false), + EnableEventBroker: helper.BoolToPtr(false), + EventBufferSize: helper.IntToPtr(0), + DurableEventCount: helper.IntToPtr(0), }, ACL: &ACLConfig{ Enabled: true, @@ -329,7 +331,9 @@ func TestConfig_Merge(t *testing.T) { NonVotingServer: true, RedundancyZone: "bar", UpgradeVersion: "bar", - EnableEventPublisher: helper.BoolToPtr(true), + EnableEventBroker: helper.BoolToPtr(true), + DurableEventCount: helper.IntToPtr(100), + EventBufferSize: helper.IntToPtr(100), }, ACL: &ACLConfig{ Enabled: true, @@ -1166,40 +1170,57 @@ func TestTelemetry_Parse(t *testing.T) { require.True(config.Telemetry.DisableDispatchedJobSummaryMetrics) } -func TestEventPublisher_Parse(t *testing.T) { +func TestEventBroker_Parse(t *testing.T) { require := require.New(t) - { a := &ServerConfig{ - EnableEventPublisher: helper.BoolToPtr(false), + EnableEventBroker: helper.BoolToPtr(false), + EventBufferSize: helper.IntToPtr(0), + DurableEventCount: helper.IntToPtr(0), } b := DefaultConfig().Server - b.EnableEventPublisher = nil + b.EnableEventBroker = nil + b.EventBufferSize = nil + b.DurableEventCount = nil result := a.Merge(b) - require.Equal(false, *result.EnableEventPublisher) + require.Equal(false, *result.EnableEventBroker) + require.Equal(0, *result.EventBufferSize) + require.Equal(0, *result.DurableEventCount) } { a := &ServerConfig{ - EnableEventPublisher: helper.BoolToPtr(true), + EnableEventBroker: helper.BoolToPtr(true), + EventBufferSize: helper.IntToPtr(5000), + DurableEventCount: helper.IntToPtr(200), } b := DefaultConfig().Server - b.EnableEventPublisher = nil + b.EnableEventBroker = nil + b.EventBufferSize = nil + b.DurableEventCount = nil result := a.Merge(b) - require.Equal(true, *result.EnableEventPublisher) + require.Equal(true, *result.EnableEventBroker) + require.Equal(5000, *result.EventBufferSize) + require.Equal(200, *result.DurableEventCount) } { a := &ServerConfig{ - EnableEventPublisher: helper.BoolToPtr(false), + EnableEventBroker: helper.BoolToPtr(false), + EventBufferSize: helper.IntToPtr(0), + DurableEventCount: helper.IntToPtr(0), } b := DefaultConfig().Server - b.EnableEventPublisher = helper.BoolToPtr(true) + b.EnableEventBroker = helper.BoolToPtr(true) + b.EventBufferSize = helper.IntToPtr(20000) + b.DurableEventCount = helper.IntToPtr(1000) result := a.Merge(b) - require.Equal(true, *result.EnableEventPublisher) + require.Equal(true, *result.EnableEventBroker) + require.Equal(20000, *result.EventBufferSize) + require.Equal(1000, *result.DurableEventCount) } } diff --git a/command/agent/event_endpoint.go b/command/agent/event_endpoint.go index af46620acf5..d28ca84e772 100644 --- a/command/agent/event_endpoint.go +++ b/command/agent/event_endpoint.go @@ -14,6 +14,7 @@ import ( "github.com/docker/docker/pkg/ioutils" "github.com/hashicorp/go-msgpack/codec" "github.com/hashicorp/nomad/nomad/structs" + "golang.org/x/sync/errgroup" ) func (s *HTTPServer) EventStream(resp http.ResponseWriter, req *http.Request) (interface{}, error) { @@ -40,12 +41,12 @@ func (s *HTTPServer) EventStream(resp http.ResponseWriter, req *http.Request) (i resp.Header().Set("Content-Type", "application/json") resp.Header().Set("Cache-Control", "no-cache") + // Set region, namespace and authtoken to args s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions) - // Make the RPC + // Determine the RPC handler to use to find a server var handler structs.StreamingRpcHandler var handlerErr error - if server := s.agent.Server(); server != nil { handler, handlerErr = server.StreamingRpcHandler("Event.Stream") } else if client := s.agent.Client(); client != nil { @@ -73,57 +74,51 @@ func (s *HTTPServer) EventStream(resp http.ResponseWriter, req *http.Request) (i // Create an output that gets flushed on every write output := ioutils.NewWriteFlusher(resp) - // create an error channel to handle errors - errCh := make(chan HTTPCodedError, 1) - - go func() { + // send request and decode events + errs, errCtx := errgroup.WithContext(ctx) + errs.Go(func() error { defer cancel() // Send the request if err := encoder.Encode(args); err != nil { - errCh <- CodedError(500, err.Error()) - return + return CodedError(500, err.Error()) } for { select { - case <-ctx.Done(): - errCh <- nil - return + case <-errCtx.Done(): + return nil default: } // Decode the response var res structs.EventStreamWrapper if err := decoder.Decode(&res); err != nil { - if err == io.EOF || err == io.ErrClosedPipe { - return - } - errCh <- CodedError(500, err.Error()) - return + return CodedError(500, err.Error()) } decoder.Reset(httpPipe) if err := res.Error; err != nil { if err.Code != nil { - errCh <- CodedError(int(*err.Code), err.Error()) - return + return CodedError(int(*err.Code), err.Error()) } } // Flush json entry to response if _, err := io.Copy(output, bytes.NewReader(res.Event.Data)); err != nil { - errCh <- CodedError(500, err.Error()) - return + return CodedError(500, err.Error()) } + // Each entry is its own new line according to ndjson.org + // append new line to each entry + fmt.Fprint(output, "\n") } - }() + }) // invoke handler handler(handlerPipe) cancel() - codedErr := <-errCh + codedErr := errs.Wait() if codedErr != nil && strings.Contains(codedErr.Error(), io.ErrClosedPipe.Error()) { codedErr = nil } @@ -144,11 +139,7 @@ func parseEventTopics(query url.Values) (map[structs.Topic][]string, error) { return nil, fmt.Errorf("error parsing topics: %w", err) } - if topics[structs.Topic(k)] == nil { - topics[structs.Topic(k)] = []string{v} - } else { - topics[structs.Topic(k)] = append(topics[structs.Topic(k)], v) - } + topics[structs.Topic(k)] = append(topics[structs.Topic(k)], v) } return topics, nil } diff --git a/command/agent/event_endpoint_test.go b/command/agent/event_endpoint_test.go index 3b95488d193..c450a917988 100644 --- a/command/agent/event_endpoint_test.go +++ b/command/agent/event_endpoint_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" @@ -37,7 +38,7 @@ func TestEventStream(t *testing.T) { assert.NoError(t, err) }() - pub, err := s.Agent.server.State().EventPublisher() + pub, err := s.Agent.server.State().EventBroker() require.NoError(t, err) pub.Publish(&structs.Events{Index: 100, Events: []structs.Event{{Payload: testEvent{ID: "123"}}}}) @@ -71,6 +72,8 @@ func TestEventStream_NamespaceQuery(t *testing.T) { httpTest(t, nil, func(s *TestAgent) { ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + req, err := http.NewRequestWithContext(ctx, "GET", "/v1/event/stream?namespace=foo", nil) require.Nil(t, err) resp := httptest.NewRecorder() @@ -82,17 +85,17 @@ func TestEventStream_NamespaceQuery(t *testing.T) { assert.NoError(t, err) }() - pub, err := s.Agent.server.State().EventPublisher() + pub, err := s.Agent.server.State().EventBroker() require.NoError(t, err) - pub.Publish(&structs.Events{Index: 100, Events: []structs.Event{{Namespace: "bar", Payload: testEvent{ID: "123"}}}}) + badID := uuid.Generate() + pub.Publish(&structs.Events{Index: 100, Events: []structs.Event{{Namespace: "bar", Payload: testEvent{ID: badID}}}}) pub.Publish(&structs.Events{Index: 101, Events: []structs.Event{{Namespace: "foo", Payload: testEvent{ID: "456"}}}}) testutil.WaitForResult(func() (bool, error) { got := resp.Body.String() want := `"Namespace":"foo"` - bad := `123` - if strings.Contains(got, bad) { + if strings.Contains(got, badID) { return false, fmt.Errorf("expected non matching namespace to be filtered, got:%v", got) } if strings.Contains(got, want) { @@ -101,7 +104,6 @@ func TestEventStream_NamespaceQuery(t *testing.T) { return false, fmt.Errorf("missing expected json, got: %v, want: %v", got, want) }, func(err error) { - cancel() require.Fail(t, err.Error()) }) diff --git a/command/agent/testdata/basic.hcl b/command/agent/testdata/basic.hcl index be35093e980..66d9f9d0f10 100644 --- a/command/agent/testdata/basic.hcl +++ b/command/agent/testdata/basic.hcl @@ -130,9 +130,9 @@ server { upgrade_version = "0.8.0" encrypt = "abc" raft_multiplier = 4 - enable_event_publisher = false + enable_event_broker = false event_buffer_size = 200 - durable_event_count = 100 + durable_event_count = 0 server_join { retry_join = ["1.1.1.1", "2.2.2.2"] diff --git a/command/agent/testdata/basic.json b/command/agent/testdata/basic.json index d9db2d5d0f1..c20889fc635 100644 --- a/command/agent/testdata/basic.json +++ b/command/agent/testdata/basic.json @@ -261,9 +261,9 @@ "data_dir": "/tmp/data", "deployment_gc_threshold": "12h", "enabled": true, - "enable_event_publisher": false, + "enable_event_broker": false, "event_buffer_size": 200, - "durable_event_count": 100, + "durable_event_count": 0, "enabled_schedulers": [ "test" ], diff --git a/nomad/config.go b/nomad/config.go index 0a2272be884..93abdc6f132 100644 --- a/nomad/config.go +++ b/nomad/config.go @@ -78,13 +78,17 @@ type Config struct { // in the absence of ACLs EnableDebug bool - // EnableEventPublisher is used to enable or disable state store + // EnableEventBroker is used to enable or disable state store // event publishing - EnableEventPublisher bool + EnableEventBroker bool // EventBufferSize is the amount of events to hold in memory. EventBufferSize int64 + // DurableEventCount is the amount of events to save to disk when + // snapshotting + DurableEventCount int64 + // LogOutput is the location to write logs to. If this is not set, // logs will go to stderr. LogOutput io.Writer @@ -420,7 +424,9 @@ func DefaultConfig() *Config { ReplicationBackoff: 30 * time.Second, SentinelGCInterval: 30 * time.Second, LicenseConfig: &LicenseConfig{}, - EnableEventPublisher: true, + EnableEventBroker: true, + EventBufferSize: 100, + DurableEventCount: 100, AutopilotConfig: &structs.AutopilotConfig{ CleanupDeadServers: true, LastContactThreshold: 200 * time.Millisecond, diff --git a/nomad/event_endpoint.go b/nomad/event_endpoint.go index 3ee249cab2d..3b980981496 100644 --- a/nomad/event_endpoint.go +++ b/nomad/event_endpoint.go @@ -2,12 +2,14 @@ package nomad import ( "context" + "fmt" "io" + "io/ioutil" "time" "github.com/hashicorp/go-msgpack/codec" + "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/helper" - "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/stream" "github.com/hashicorp/nomad/nomad/structs" ) @@ -41,35 +43,29 @@ func (e *Event) stream(conn io.ReadWriteCloser) { return } - // ACL check - // TODO(drew) ACL checks need to be per topic - // All Events Management - // System Events Management - // Node Events NamespaceCapabilityReadEvents - // Job/Alloc Events NamespaceCapabilityReadEvents - if aclObj, err := e.srv.ResolveToken(args.AuthToken); err != nil { + aclObj, err := e.srv.ResolveToken(args.AuthToken) + if err != nil { handleJsonResultError(err, nil, encoder) return - } else if aclObj != nil && !aclObj.IsManagement() { - handleJsonResultError(structs.ErrPermissionDenied, helper.Int64ToPtr(403), encoder) - return } - // authToken is passed to the subscribe request so the event stream - // can handle closing a subscription if the authToken expires. - // If ACLs are disabled, a random token is generated and it will - // never be closed due to expiry. - authToken := args.AuthToken - if authToken == "" { - authToken = uuid.Generate() - } subReq := &stream.SubscribeRequest{ - Token: authToken, + Token: args.AuthToken, Topics: args.Topics, Index: uint64(args.Index), Namespace: args.Namespace, } - publisher, err := e.srv.State().EventPublisher() + + // Check required ACL permissions for requested Topics + if aclObj != nil { + if err := aclCheckForEvents(subReq, aclObj); err != nil { + handleJsonResultError(structs.ErrPermissionDenied, helper.Int64ToPtr(403), encoder) + return + } + } + + // Get the servers broker and subscribe + publisher, err := e.srv.State().EventBroker() if err != nil { handleJsonResultError(err, helper.Int64ToPtr(500), encoder) return @@ -86,23 +82,14 @@ func (e *Event) stream(conn io.ReadWriteCloser) { } defer subscription.Unsubscribe() - ndJsonCh := make(chan *structs.NDJson) errCh := make(chan error) - jsonStream := stream.NewNDJsonStream(ndJsonCh, 30*time.Second) - jsonStream.Run(ctx) + jsonStream := stream.NewJsonStream(ctx, 30*time.Second) // goroutine to detect remote side closing go func() { - if _, err := conn.Read(nil); err != nil { - // One end of the pipe explicitly closed, exit - cancel() - return - } - select { - case <-ctx.Done(): - return - } + io.Copy(ioutil.Discard, conn) + cancel() }() go func() { @@ -140,7 +127,7 @@ OUTER: break OUTER case <-ctx.Done(): break OUTER - case eventJSON, ok := <-ndJsonCh: + case eventJSON, ok := <-jsonStream.OutCh(): // check if ndjson may have been closed when an error occurred, // check once more for an error. if !ok { @@ -209,3 +196,47 @@ func handleJsonResultError(err error, code *int64, encoder *codec.Encoder) { Error: structs.NewRpcError(err, code), }) } + +func aclCheckForEvents(subReq *stream.SubscribeRequest, aclObj *acl.ACL) error { + if len(subReq.Topics) == 0 { + return fmt.Errorf("invalid topic request") + } + + reqPolicies := make(map[string]struct{}) + var required = struct{}{} + + for topic := range subReq.Topics { + switch topic { + case structs.TopicDeployment, structs.TopicEval, + structs.TopicAlloc, structs.TopicJob: + if _, ok := reqPolicies[acl.NamespaceCapabilityReadJob]; !ok { + reqPolicies[acl.NamespaceCapabilityReadJob] = required + } + case structs.TopicNode: + reqPolicies["node-read"] = required + case structs.TopicAll: + reqPolicies["management"] = required + default: + return fmt.Errorf("unknown topic %s", topic) + } + } + + for checks := range reqPolicies { + switch checks { + case acl.NamespaceCapabilityReadJob: + if ok := aclObj.AllowNsOp(subReq.Namespace, acl.NamespaceCapabilityReadJob); !ok { + return structs.ErrPermissionDenied + } + case "node-read": + if ok := aclObj.AllowNodeRead(); !ok { + return structs.ErrPermissionDenied + } + case "management": + if ok := aclObj.IsManagement(); !ok { + return structs.ErrPermissionDenied + } + } + } + + return nil +} diff --git a/nomad/event_endpoint_test.go b/nomad/event_endpoint_test.go index 573084ff313..26757d37ca3 100644 --- a/nomad/event_endpoint_test.go +++ b/nomad/event_endpoint_test.go @@ -23,7 +23,7 @@ func TestEventStream(t *testing.T) { t.Parallel() s1, cleanupS1 := TestServer(t, func(c *Config) { - c.EnableEventPublisher = true + c.EnableEventBroker = true }) defer cleanupS1() @@ -65,7 +65,7 @@ func TestEventStream(t *testing.T) { }() // retrieve publisher for server, send event - publisher, err := s1.State().EventPublisher() + publisher, err := s1.State().EventBroker() require.NoError(t, err) node := mock.Node() @@ -75,7 +75,12 @@ func TestEventStream(t *testing.T) { encoder := codec.NewEncoder(p1, structs.MsgpackHandle) require.Nil(t, encoder.Encode(req)) + publisher.Publish(&structs.Events{Index: uint64(2), Events: []structs.Event{{Topic: "test", Payload: node}}}) + publisher.Publish(&structs.Events{Index: uint64(3), Events: []structs.Event{{Topic: "test", Payload: node}}}) + timeout := time.After(3 * time.Second) + got := 0 + want := 3 OUTER: for { select { @@ -89,7 +94,7 @@ OUTER: } // ignore heartbeat - if msg.Event == stream.NDJsonHeartbeat { + if msg.Event == stream.JsonHeartbeat { continue } @@ -107,7 +112,11 @@ OUTER: dec.Decode(event.Events[0].Payload) require.NoError(t, err) require.Equal(t, node.ID, out.ID) - break OUTER + + got++ + if got == want { + break OUTER + } } } } @@ -118,7 +127,7 @@ func TestEventStream_StreamErr(t *testing.T) { t.Parallel() s1, cleanupS1 := TestServer(t, func(c *Config) { - c.EnableEventPublisher = true + c.EnableEventBroker = true }) defer cleanupS1() @@ -158,7 +167,7 @@ func TestEventStream_StreamErr(t *testing.T) { } }() - publisher, err := s1.State().EventPublisher() + publisher, err := s1.State().EventBroker() require.NoError(t, err) node := mock.Node() @@ -200,12 +209,12 @@ func TestEventStream_RegionForward(t *testing.T) { t.Parallel() s1, cleanupS1 := TestServer(t, func(c *Config) { - c.EnableEventPublisher = true + c.EnableEventBroker = true }) defer cleanupS1() s2, cleanupS2 := TestServer(t, func(c *Config) { - c.EnableEventPublisher = true + c.EnableEventBroker = true c.Region = "foo" }) defer cleanupS2() @@ -249,7 +258,7 @@ func TestEventStream_RegionForward(t *testing.T) { }() // publish with server 2 - publisher, err := s2.State().EventPublisher() + publisher, err := s2.State().EventBroker() require.NoError(t, err) node := mock.Node() @@ -272,7 +281,7 @@ OUTER: t.Fatalf("Got error: %v", msg.Error.Error()) } - if msg.Event == stream.NDJsonHeartbeat { + if msg.Event == stream.JsonHeartbeat { continue } @@ -306,35 +315,124 @@ func TestEventStream_ACL(t *testing.T) { policyBad := mock.NamespacePolicy("other", "", []string{acl.NamespaceCapabilityReadFS}) tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad) + policyNsGood := mock.NamespacePolicy("foo", "", []string{acl.NamespaceCapabilityReadJob}) + tokenNsFoo := mock.CreatePolicyAndToken(t, s.State(), 1006, "valid", policyNsGood) + + policyNsNode := mock.NamespacePolicy("foo", "", []string{acl.NamespaceCapabilityReadJob}) + policyNsNode += "\n" + mock.NodePolicy("read") + tokenNsNode := mock.CreatePolicyAndToken(t, s.State(), 1007, "validnNsNode", policyNsNode) + cases := []struct { Name string Token string + Topics map[structs.Topic][]string + Namespace string ExpectedErr string + PublishFn func(p *stream.EventBroker) }{ { - Name: "no token", - Token: "", + Name: "no token", + Token: "", + Topics: map[structs.Topic][]string{ + "*": {"*"}, + }, + ExpectedErr: structs.ErrPermissionDenied.Error(), + }, + { + Name: "bad token", + Token: tokenBad.SecretID, + Topics: map[structs.Topic][]string{ + "*": {"*"}, + }, + ExpectedErr: structs.ErrPermissionDenied.Error(), + }, + { + Name: "root token", + Token: root.SecretID, + Topics: map[structs.Topic][]string{ + "*": {"*"}, + }, + ExpectedErr: "subscription closed by server", + }, + { + Name: "job namespace token - correct ns", + Token: tokenNsFoo.SecretID, + Topics: map[structs.Topic][]string{ + "Job": {"*"}, + "Eval": {"*"}, + "Alloc": {"*"}, + "Deployment": {"*"}, + }, + Namespace: "foo", + ExpectedErr: "subscription closed by server", + PublishFn: func(p *stream.EventBroker) { + p.Publish(&structs.Events{Index: uint64(1000), Events: []structs.Event{{Topic: "Job", Namespace: "foo", Payload: mock.Job()}}}) + }, + }, + { + Name: "job namespace token - incorrect ns", + Token: tokenNsFoo.SecretID, + Topics: map[structs.Topic][]string{ + "Job": {"*"}, // good + }, + Namespace: "bar", // bad ExpectedErr: structs.ErrPermissionDenied.Error(), + PublishFn: func(p *stream.EventBroker) { + p.Publish(&structs.Events{Index: uint64(1000), Events: []structs.Event{{Topic: "Job", Namespace: "foo", Payload: mock.Job()}}}) + }, }, { - Name: "bad token", - Token: tokenBad.SecretID, + Name: "job namespace token - request management topic", + Token: tokenNsFoo.SecretID, + Topics: map[structs.Topic][]string{ + "*": {"*"}, // bad + }, + Namespace: "foo", ExpectedErr: structs.ErrPermissionDenied.Error(), + PublishFn: func(p *stream.EventBroker) { + p.Publish(&structs.Events{Index: uint64(1000), Events: []structs.Event{{Topic: "Job", Namespace: "foo", Payload: mock.Job()}}}) + }, }, { - Name: "root token", - Token: root.SecretID, + Name: "job namespace token - request invalid node topic", + Token: tokenNsFoo.SecretID, + Topics: map[structs.Topic][]string{ + "Eval": {"*"}, // good + "Node": {"*"}, // bad + }, + Namespace: "foo", + ExpectedErr: structs.ErrPermissionDenied.Error(), + PublishFn: func(p *stream.EventBroker) { + p.Publish(&structs.Events{Index: uint64(1000), Events: []structs.Event{{Topic: "Job", Namespace: "foo", Payload: mock.Job()}}}) + }, + }, + { + Name: "job+node namespace token, valid", + Token: tokenNsNode.SecretID, + Topics: map[structs.Topic][]string{ + "Eval": {"*"}, // good + "Node": {"*"}, // good + }, + Namespace: "foo", ExpectedErr: "subscription closed by server", + PublishFn: func(p *stream.EventBroker) { + p.Publish(&structs.Events{Index: uint64(1000), Events: []structs.Event{{Topic: "Node", Payload: mock.Node()}}}) + }, }, } for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { + var ns string + if tc.Namespace != "" { + ns = tc.Namespace + } // Create request for all topics and keys req := structs.EventStreamRequest{ - Topics: map[structs.Topic][]string{"*": {"*"}}, + Topics: tc.Topics, QueryOptions: structs.QueryOptions{ Region: s.Region(), + Namespace: ns, AuthToken: tc.Token, }, } @@ -372,20 +470,25 @@ func TestEventStream_ACL(t *testing.T) { encoder := codec.NewEncoder(p1, structs.MsgpackHandle) require.Nil(encoder.Encode(req)) - publisher, err := s.State().EventPublisher() + publisher, err := s.State().EventBroker() require.NoError(err) // publish some events node := mock.Node() + publisher.Publish(&structs.Events{Index: uint64(1), Events: []structs.Event{{Topic: "test", Payload: node}}}) publisher.Publish(&structs.Events{Index: uint64(2), Events: []structs.Event{{Topic: "test", Payload: node}}}) + if tc.PublishFn != nil { + tc.PublishFn(publisher) + } + timeout := time.After(5 * time.Second) OUTER: for { select { case <-timeout: - require.Fail("timeout waiting for response") + t.Fatal("timeout waiting for events") case err := <-errCh: t.Fatal(err) case msg := <-streamMsg: @@ -398,7 +501,7 @@ func TestEventStream_ACL(t *testing.T) { if strings.Contains(msg.Error.Error(), tc.ExpectedErr) { break OUTER } else { - require.Fail("Unexpected error", msg.Error) + t.Fatalf("unexpected error %v", msg.Error) } } } diff --git a/nomad/fsm.go b/nomad/fsm.go index ead6d43e03f..d30c15d4528 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -8,6 +8,7 @@ import ( "time" metrics "github.com/armon/go-metrics" + "github.com/hashicorp/go-hclog" log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-msgpack/codec" @@ -102,7 +103,7 @@ type nomadFSM struct { type nomadSnapshot struct { snap *state.StateSnapshot timetable *TimeTable - durableEventCount int + durableEventCount int64 } // snapshotHeader is the first entry in our snapshot @@ -128,14 +129,17 @@ type FSMConfig struct { // Region is the region of the server embedding the FSM Region string - EnableEventPublisher bool + // EnableEventBroker specifies if the FSMs state store should enable + // it's event publisher. + EnableEventBroker bool + // EventBufferSize is the amount of messages to hold in memory EventBufferSize int64 // Durable count specifies the amount of events generated by the state store // to save to disk during snapshot generation. The most recent events // limited to count will be saved. - DurableEventCount int + DurableEventCount int64 } // NewFSMPath is used to construct a new FSM with a blank state @@ -144,7 +148,7 @@ func NewFSM(config *FSMConfig) (*nomadFSM, error) { sconfig := &state.StateStoreConfig{ Logger: config.Logger, Region: config.Region, - EnablePublisher: config.EnableEventPublisher, + EnablePublisher: config.EnableEventBroker, EventBufferSize: config.EventBufferSize, DurableEventCount: config.DurableEventCount, } @@ -176,7 +180,7 @@ func NewFSM(config *FSMConfig) (*nomadFSM, error) { // Close is used to cleanup resources associated with the FSM func (n *nomadFSM) Close() error { - n.state.StopEventPublisher() + n.state.StopEventBroker() return nil } @@ -1275,7 +1279,7 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { config := &state.StateStoreConfig{ Logger: n.config.Logger, Region: n.config.Region, - EnablePublisher: n.config.EnableEventPublisher, + EnablePublisher: n.config.EnableEventBroker, EventBufferSize: n.config.EventBufferSize, DurableEventCount: n.config.DurableEventCount, } @@ -1522,6 +1526,12 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { return err } case EventSnapshot: + // If the event broker is disabled but the snapshot from potentially + // a remote server has events, ignore them + if !n.config.EnableEventBroker { + return nil + } + event := new(structs.Events) if err := dec.Decode(event); err != nil { return err @@ -1543,7 +1553,9 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { } } - restore.Commit() + if err := restore.Commit(); err != nil { + return err + } // COMPAT Remove in 0.10 // Clean up active deployments that do not have a job @@ -1565,8 +1577,9 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { // Rehydrate the new state store's event publisher with the events // persisted in the snapshot - if n.config.EnableEventPublisher { - if err := rehydratePublisherFromState(n.state); err != nil { + if n.config.EnableEventBroker { + n.logger.Debug("Rehydrating event broker events from snapshot") + if err := rehydratePublisherFromState(n.state, n.logger); err != nil { n.logger.Error("Error re-hydrating event publisher during restore", "error", err) } } @@ -1574,8 +1587,11 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { return nil } -func rehydratePublisherFromState(s *state.StateStore) error { - pub, err := s.EventPublisher() +// rehydratePublisherFromState is used during a snapshot restore to +// add the persisted events from that snapshot that were just added to memdb +// back into the event publisher +func rehydratePublisherFromState(s *state.StateStore, l hclog.Logger) error { + pub, err := s.EventBroker() if err != nil { return err } @@ -1584,6 +1600,7 @@ func rehydratePublisherFromState(s *state.StateStore) error { if err != nil { return err } + count := 0 for { raw := events.Next() if raw == nil { @@ -1591,7 +1608,10 @@ func rehydratePublisherFromState(s *state.StateStore) error { } e := raw.(*structs.Events) pub.Publish(e) + count++ } + + l.Debug("finished hydrating event broker from snapshot", "events", count) return nil } @@ -2386,7 +2406,7 @@ func (s *nomadSnapshot) persistEvents(sink raft.SnapshotSink, encoder *codec.Enc return err } - count := 0 + var count int64 for { // Get the next item raw := events.Next() @@ -2397,14 +2417,12 @@ func (s *nomadSnapshot) persistEvents(sink raft.SnapshotSink, encoder *codec.Enc // Prepare the request struct event := raw.(*structs.Events) - eventCount := len(event.Events) - // Write out a volume snapshot sink.Write([]byte{byte(EventSnapshot)}) if err := encoder.Encode(event); err != nil { return err } - count += eventCount + count += int64(len(event.Events)) // Only write to sink until durableCount has been reached if count >= s.durableEventCount { diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index b1f8d83dd2e..2557e59e86a 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -50,13 +50,13 @@ func testFSM(t *testing.T) *nomadFSM { dispatcher, _ := testPeriodicDispatcher(t) logger := testlog.HCLogger(t) fsmConfig := &FSMConfig{ - EvalBroker: broker, - Periodic: dispatcher, - Blocked: NewBlockedEvals(broker, logger), - Logger: logger, - Region: "global", - EnableEventPublisher: true, - EventBufferSize: 100, + EvalBroker: broker, + Periodic: dispatcher, + Blocked: NewBlockedEvals(broker, logger), + Logger: logger, + Region: "global", + EnableEventBroker: true, + EventBufferSize: 100, } fsm, err := NewFSM(fsmConfig) if err != nil { @@ -3206,7 +3206,7 @@ func TestFSM_SnapshotRestore_Events_WithDurability(t *testing.T) { t.Parallel() // Add some state fsm := testFSM(t) - fsm.config.EnableEventPublisher = true + fsm.config.EnableEventBroker = true // DurableEventCount = 4 each mock events wrapper contains 2 events fsm.config.DurableEventCount = 4 @@ -3246,7 +3246,7 @@ func TestFSM_SnapshotRestore_Events_WithDurability(t *testing.T) { raw1 := iter.Next() require.Nil(t, raw1) - pub, err := state2.EventPublisher() + pub, err := state2.EventBroker() require.NoError(t, err) testutil.WaitForResult(func() (bool, error) { @@ -3264,7 +3264,7 @@ func TestFSM_SnapshotRestore_Events_NoDurability(t *testing.T) { t.Parallel() fsm := testFSM(t) // Enable event publisher with durable event count of zero - fsm.config.EnableEventPublisher = true + fsm.config.EnableEventBroker = true fsm.config.DurableEventCount = 0 state := fsm.State() diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 0f685dd67c2..23baba2e41c 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -1917,7 +1917,7 @@ func TestClientEndpoint_GetClientAllocs_Blocking_GC(t *testing.T) { if time.Since(start) < 100*time.Millisecond { t.Fatalf("too fast") } - assert.EqualValues(200, int(resp3.Index)) + assert.EqualValues(200, resp3.Index) if assert.Len(resp3.Allocs, 1) { assert.EqualValues(100, resp3.Allocs[alloc1.ID]) } diff --git a/nomad/server.go b/nomad/server.go index 29757dd3c08..44de55471b9 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -1211,13 +1211,14 @@ func (s *Server) setupRaft() error { // Create the FSM fsmConfig := &FSMConfig{ - EvalBroker: s.evalBroker, - Periodic: s.periodicDispatcher, - Blocked: s.blockedEvals, - Logger: s.logger, - Region: s.Region(), - EnableEventPublisher: s.config.EnableEventPublisher, - EventBufferSize: s.config.EventBufferSize, + EvalBroker: s.evalBroker, + Periodic: s.periodicDispatcher, + Blocked: s.blockedEvals, + Logger: s.logger, + Region: s.Region(), + EnableEventBroker: s.config.EnableEventBroker, + DurableEventCount: s.config.DurableEventCount, + EventBufferSize: s.config.EventBufferSize, } var err error s.fsm, err = NewFSM(fsmConfig) diff --git a/nomad/state/deployment_events_test.go b/nomad/state/deployment_events_test.go index bbaeb7996ea..08e06e45a8d 100644 --- a/nomad/state/deployment_events_test.go +++ b/nomad/state/deployment_events_test.go @@ -5,7 +5,6 @@ import ( "testing" "time" - "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/stream" "github.com/hashicorp/nomad/nomad/structs" @@ -15,7 +14,7 @@ import ( func TestDeploymentEventFromChanges(t *testing.T) { t.Parallel() s := TestStateStoreCfg(t, TestStateStorePublisher(t)) - defer s.StopEventPublisher() + defer s.StopEventBroker() // setup setupTx := s.db.WriteTxn(10) @@ -59,82 +58,6 @@ func TestDeploymentEventFromChanges(t *testing.T) { } -func TestDeploymentEventFromChanges_Promotion(t *testing.T) { - t.Parallel() - s := TestStateStoreCfg(t, TestStateStorePublisher(t)) - defer s.StopEventPublisher() - - // setup - setupTx := s.db.WriteTxn(10) - - j := mock.Job() - tg1 := j.TaskGroups[0] - tg2 := tg1.Copy() - tg2.Name = "foo" - j.TaskGroups = append(j.TaskGroups, tg2) - require.NoError(t, s.upsertJobImpl(10, j, false, setupTx)) - - d := mock.Deployment() - d.StatusDescription = structs.DeploymentStatusDescriptionRunningNeedsPromotion - d.JobID = j.ID - d.TaskGroups = map[string]*structs.DeploymentState{ - "web": { - DesiredTotal: 10, - DesiredCanaries: 1, - }, - "foo": { - DesiredTotal: 10, - DesiredCanaries: 1, - }, - } - require.NoError(t, s.upsertDeploymentImpl(10, d, setupTx)) - - // create set of allocs - c1 := mock.Alloc() - c1.JobID = j.ID - c1.DeploymentID = d.ID - d.TaskGroups[c1.TaskGroup].PlacedCanaries = append(d.TaskGroups[c1.TaskGroup].PlacedCanaries, c1.ID) - c1.DeploymentStatus = &structs.AllocDeploymentStatus{ - Healthy: helper.BoolToPtr(true), - } - c2 := mock.Alloc() - c2.JobID = j.ID - c2.DeploymentID = d.ID - d.TaskGroups[c2.TaskGroup].PlacedCanaries = append(d.TaskGroups[c2.TaskGroup].PlacedCanaries, c2.ID) - c2.TaskGroup = tg2.Name - c2.DeploymentStatus = &structs.AllocDeploymentStatus{ - Healthy: helper.BoolToPtr(true), - } - - require.NoError(t, s.upsertAllocsImpl(10, []*structs.Allocation{c1, c2}, setupTx)) - - // commit setup transaction - setupTx.Txn.Commit() - - e := mock.Eval() - // Request to promote canaries - msgType := structs.DeploymentPromoteRequestType - req := &structs.ApplyDeploymentPromoteRequest{ - DeploymentPromoteRequest: structs.DeploymentPromoteRequest{ - DeploymentID: d.ID, - All: true, - }, - Eval: e, - } - - require.NoError(t, s.UpdateDeploymentPromotion(msgType, 100, req)) - - events := WaitForEvents(t, s, 100, 1, 1*time.Second) - require.Len(t, events, 4) - - got := events[0] - require.Equal(t, uint64(100), got.Index) - require.Equal(t, d.ID, got.Key) - - de := got.Payload.(*DeploymentEvent) - require.Equal(t, structs.DeploymentStatusRunning, de.Deployment.Status) -} - func WaitForEvents(t *testing.T, s *StateStore, index uint64, minEvents int, timeout time.Duration) []structs.Event { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -163,15 +86,19 @@ func WaitForEvents(t *testing.T, s *StateStore, index uint64, minEvents int, tim } func EventsForIndex(t *testing.T, s *StateStore, index uint64) []structs.Event { - pub, err := s.EventPublisher() + pub, err := s.EventBroker() require.NoError(t, err) sub, err := pub.Subscribe(&stream.SubscribeRequest{ Topics: map[structs.Topic][]string{ - "*": []string{"*"}, + "*": {"*"}, }, - Index: index, + Index: index, + StartExactlyAtIndex: true, }) + if err != nil { + return []structs.Event{} + } defer sub.Unsubscribe() require.NoError(t, err) diff --git a/nomad/state/events.go b/nomad/state/events.go index f396975f67a..a5a65a02b8c 100644 --- a/nomad/state/events.go +++ b/nomad/state/events.go @@ -7,22 +7,11 @@ import ( ) const ( - TopicDeployment structs.Topic = "Deployment" - TopicEval structs.Topic = "Eval" - TopicAlloc structs.Topic = "Alloc" - TopicJob structs.Topic = "Job" - // TopicNodeRegistration stream.Topic = "NodeRegistration" - // TopicNodeDeregistration stream.Topic = "NodeDeregistration" - // TopicNodeDrain stream.Topic = "NodeDrain" - TopicNode structs.Topic = "Node" - - // TODO(drew) Node Events use TopicNode + Type - TypeNodeRegistration = "NodeRegistration" - TypeNodeDeregistration = "NodeDeregistration" - TypeNodeEligibilityUpdate = "NodeEligibility" - TypeNodeDrain = "NodeDrain" - TypeNodeEvent = "NodeEvent" - + TypeNodeRegistration = "NodeRegistration" + TypeNodeDeregistration = "NodeDeregistration" + TypeNodeEligibilityUpdate = "NodeEligibility" + TypeNodeDrain = "NodeDrain" + TypeNodeEvent = "NodeEvent" TypeDeploymentUpdate = "DeploymentStatusUpdate" TypeDeploymentPromotion = "DeploymentPromotion" TypeDeploymentAllocHealth = "DeploymentAllocHealth" @@ -36,22 +25,28 @@ const ( TypePlanResult = "PlanResult" ) +// JobEvent holds a newly updated Job. type JobEvent struct { Job *structs.Job } +// EvalEvent holds a newly updated Eval. type EvalEvent struct { Eval *structs.Evaluation } +// AllocEvent holds a newly updated Allocation. The +// Allocs embedded Job has been removed to reduce size. type AllocEvent struct { Alloc *structs.Allocation } +// DeploymentEvent holds a newly updated Deployment. type DeploymentEvent struct { Deployment *structs.Deployment } +// NodeEvent holds a newly updated Node type NodeEvent struct { Node *structs.Node } @@ -74,43 +69,30 @@ type JobDrainDetails struct { AllocDetails map[string]NodeDrainAllocDetails } +var MsgTypeEvents = map[structs.MessageType]string{ + structs.NodeRegisterRequestType: TypeNodeRegistration, + structs.UpsertNodeEventsType: TypeNodeEvent, + structs.EvalUpdateRequestType: TypeEvalUpdated, + structs.AllocClientUpdateRequestType: TypeAllocUpdated, + structs.JobRegisterRequestType: TypeJobRegistered, + structs.AllocUpdateRequestType: TypeAllocUpdated, + structs.NodeUpdateStatusRequestType: TypeNodeEvent, + structs.JobDeregisterRequestType: TypeJobDeregistered, + structs.JobBatchDeregisterRequestType: TypeJobBatchDeregistered, + structs.AllocUpdateDesiredTransitionRequestType: TypeAllocUpdateDesiredStatus, + structs.NodeUpdateEligibilityRequestType: TypeNodeDrain, + structs.BatchNodeUpdateDrainRequestType: TypeNodeDrain, + structs.DeploymentStatusUpdateRequestType: TypeDeploymentUpdate, + structs.DeploymentPromoteRequestType: TypeDeploymentPromotion, + structs.DeploymentAllocHealthRequestType: TypeDeploymentAllocHealth, + structs.ApplyPlanResultsRequestType: TypePlanResult, +} + +// GenericEventsFromChanges returns a set of events for a given set of +// transaction changes. It currently ignores Delete operations. func GenericEventsFromChanges(tx ReadTxn, changes Changes) (*structs.Events, error) { - var eventType string - switch changes.MsgType { - case structs.NodeRegisterRequestType: - eventType = TypeNodeRegistration - case structs.UpsertNodeEventsType: - eventType = TypeNodeEvent - case structs.EvalUpdateRequestType: - eventType = TypeEvalUpdated - case structs.AllocClientUpdateRequestType: - eventType = TypeAllocUpdated - case structs.JobRegisterRequestType: - eventType = TypeJobRegistered - case structs.AllocUpdateRequestType: - eventType = TypeAllocUpdated - case structs.NodeUpdateStatusRequestType: - eventType = TypeNodeEvent - case structs.JobDeregisterRequestType: - eventType = TypeJobDeregistered - case structs.JobBatchDeregisterRequestType: - eventType = TypeJobBatchDeregistered - case structs.AllocUpdateDesiredTransitionRequestType: - eventType = TypeAllocUpdateDesiredStatus - case structs.NodeUpdateEligibilityRequestType: - eventType = TypeNodeDrain - case structs.BatchNodeUpdateDrainRequestType: - eventType = TypeNodeDrain - case structs.DeploymentStatusUpdateRequestType: - eventType = TypeDeploymentUpdate - case structs.DeploymentPromoteRequestType: - eventType = TypeDeploymentPromotion - case structs.DeploymentAllocHealthRequestType: - eventType = TypeDeploymentAllocHealth - case structs.ApplyPlanResultsRequestType: - eventType = TypePlanResult - default: - // unknown request type + eventType, ok := MsgTypeEvents[changes.MsgType] + if !ok { return nil, nil } @@ -127,7 +109,7 @@ func GenericEventsFromChanges(tx ReadTxn, changes Changes) (*structs.Events, err } event := structs.Event{ - Topic: TopicEval, + Topic: structs.TopicEval, Type: eventType, Index: changes.Index, Key: after.ID, @@ -149,15 +131,22 @@ func GenericEventsFromChanges(tx ReadTxn, changes Changes) (*structs.Events, err } alloc := after.Copy() + + filterKeys := []string{ + alloc.JobID, + alloc.DeploymentID, + } + // remove job info to help keep size of alloc event down alloc.Job = nil event := structs.Event{ - Topic: TopicAlloc, - Type: eventType, - Index: changes.Index, - Key: after.ID, - Namespace: after.Namespace, + Topic: structs.TopicAlloc, + Type: eventType, + Index: changes.Index, + Key: after.ID, + FilterKeys: filterKeys, + Namespace: after.Namespace, Payload: &AllocEvent{ Alloc: alloc, }, @@ -174,7 +163,7 @@ func GenericEventsFromChanges(tx ReadTxn, changes Changes) (*structs.Events, err } event := structs.Event{ - Topic: TopicAlloc, + Topic: structs.TopicJob, Type: eventType, Index: changes.Index, Key: after.ID, @@ -195,7 +184,7 @@ func GenericEventsFromChanges(tx ReadTxn, changes Changes) (*structs.Events, err } event := structs.Event{ - Topic: TopicNode, + Topic: structs.TopicNode, Type: eventType, Index: changes.Index, Key: after.ID, @@ -214,7 +203,7 @@ func GenericEventsFromChanges(tx ReadTxn, changes Changes) (*structs.Events, err } event := structs.Event{ - Topic: TopicNode, + Topic: structs.TopicDeployment, Type: eventType, Index: changes.Index, Key: after.ID, diff --git a/nomad/state/events_test.go b/nomad/state/events_test.go new file mode 100644 index 00000000000..ba93adedb95 --- /dev/null +++ b/nomad/state/events_test.go @@ -0,0 +1,567 @@ +package state + +import ( + "testing" + "time" + + "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/require" +) + +// structs.AllocClientUpdateRequestType: +// structs.AllocUpdateRequestType +// JobDeregisterRequestType +// jobregisterrequesttype + +func TestGenericEventsFromChanges_DeploymentUpdate(t *testing.T) { + t.Parallel() + s := TestStateStoreCfg(t, TestStateStorePublisher(t)) + defer s.StopEventBroker() + + // setup + setupTx := s.db.WriteTxn(10) + + j := mock.Job() + e := mock.Eval() + e.JobID = j.ID + + d := mock.Deployment() + d.JobID = j.ID + + require.NoError(t, s.upsertJobImpl(10, j, false, setupTx)) + require.NoError(t, s.upsertDeploymentImpl(10, d, setupTx)) + + setupTx.Txn.Commit() + + msgType := structs.DeploymentStatusUpdateRequestType + + req := &structs.DeploymentStatusUpdateRequest{ + DeploymentUpdate: &structs.DeploymentStatusUpdate{ + DeploymentID: d.ID, + Status: structs.DeploymentStatusPaused, + StatusDescription: structs.DeploymentStatusDescriptionPaused, + }, + Eval: e, + // Exlude Job and assert its added + } + + require.NoError(t, s.UpdateDeploymentStatus(msgType, 100, req)) + + events := WaitForEvents(t, s, 100, 1, 1*time.Second) + require.Len(t, events, 2) + + got := events[0] + require.Equal(t, uint64(100), got.Index) + require.Equal(t, d.ID, got.Key) + + de := got.Payload.(*DeploymentEvent) + require.Equal(t, structs.DeploymentStatusPaused, de.Deployment.Status) + require.Contains(t, got.FilterKeys, j.ID) +} + +func TestGenericEventsFromChanges_DeploymentPromotion(t *testing.T) { + t.Parallel() + s := TestStateStoreCfg(t, TestStateStorePublisher(t)) + defer s.StopEventBroker() + + // setup + setupTx := s.db.WriteTxn(10) + + j := mock.Job() + tg1 := j.TaskGroups[0] + tg2 := tg1.Copy() + tg2.Name = "foo" + j.TaskGroups = append(j.TaskGroups, tg2) + require.NoError(t, s.upsertJobImpl(10, j, false, setupTx)) + + d := mock.Deployment() + d.StatusDescription = structs.DeploymentStatusDescriptionRunningNeedsPromotion + d.JobID = j.ID + d.TaskGroups = map[string]*structs.DeploymentState{ + "web": { + DesiredTotal: 10, + DesiredCanaries: 1, + }, + "foo": { + DesiredTotal: 10, + DesiredCanaries: 1, + }, + } + require.NoError(t, s.upsertDeploymentImpl(10, d, setupTx)) + + // create set of allocs + c1 := mock.Alloc() + c1.JobID = j.ID + c1.DeploymentID = d.ID + d.TaskGroups[c1.TaskGroup].PlacedCanaries = append(d.TaskGroups[c1.TaskGroup].PlacedCanaries, c1.ID) + c1.DeploymentStatus = &structs.AllocDeploymentStatus{ + Healthy: helper.BoolToPtr(true), + } + c2 := mock.Alloc() + c2.JobID = j.ID + c2.DeploymentID = d.ID + d.TaskGroups[c2.TaskGroup].PlacedCanaries = append(d.TaskGroups[c2.TaskGroup].PlacedCanaries, c2.ID) + c2.TaskGroup = tg2.Name + c2.DeploymentStatus = &structs.AllocDeploymentStatus{ + Healthy: helper.BoolToPtr(true), + } + + require.NoError(t, s.upsertAllocsImpl(10, []*structs.Allocation{c1, c2}, setupTx)) + + // commit setup transaction + setupTx.Txn.Commit() + + e := mock.Eval() + // Request to promote canaries + msgType := structs.DeploymentPromoteRequestType + req := &structs.ApplyDeploymentPromoteRequest{ + DeploymentPromoteRequest: structs.DeploymentPromoteRequest{ + DeploymentID: d.ID, + All: true, + }, + Eval: e, + } + + require.NoError(t, s.UpdateDeploymentPromotion(msgType, 100, req)) + + events := WaitForEvents(t, s, 100, 1, 1*time.Second) + require.Len(t, events, 4) + + got := events[0] + require.Equal(t, uint64(100), got.Index) + require.Equal(t, d.ID, got.Key) + + de := got.Payload.(*DeploymentEvent) + require.Equal(t, structs.DeploymentStatusRunning, de.Deployment.Status) + require.Equal(t, TypeDeploymentPromotion, got.Type) +} + +func TestGenericEventsFromChanges_DeploymentAllocHealthRequestType(t *testing.T) { + t.Parallel() + s := TestStateStoreCfg(t, TestStateStorePublisher(t)) + defer s.StopEventBroker() + + // setup + setupTx := s.db.WriteTxn(10) + + j := mock.Job() + tg1 := j.TaskGroups[0] + tg2 := tg1.Copy() + tg2.Name = "foo" + j.TaskGroups = append(j.TaskGroups, tg2) + require.NoError(t, s.upsertJobImpl(10, j, false, setupTx)) + + d := mock.Deployment() + d.StatusDescription = structs.DeploymentStatusDescriptionRunningNeedsPromotion + d.JobID = j.ID + d.TaskGroups = map[string]*structs.DeploymentState{ + "web": { + DesiredTotal: 10, + DesiredCanaries: 1, + }, + "foo": { + DesiredTotal: 10, + DesiredCanaries: 1, + }, + } + require.NoError(t, s.upsertDeploymentImpl(10, d, setupTx)) + + // create set of allocs + c1 := mock.Alloc() + c1.JobID = j.ID + c1.DeploymentID = d.ID + d.TaskGroups[c1.TaskGroup].PlacedCanaries = append(d.TaskGroups[c1.TaskGroup].PlacedCanaries, c1.ID) + c1.DeploymentStatus = &structs.AllocDeploymentStatus{ + Healthy: helper.BoolToPtr(true), + } + c2 := mock.Alloc() + c2.JobID = j.ID + c2.DeploymentID = d.ID + d.TaskGroups[c2.TaskGroup].PlacedCanaries = append(d.TaskGroups[c2.TaskGroup].PlacedCanaries, c2.ID) + c2.TaskGroup = tg2.Name + c2.DeploymentStatus = &structs.AllocDeploymentStatus{ + Healthy: helper.BoolToPtr(true), + } + + require.NoError(t, s.upsertAllocsImpl(10, []*structs.Allocation{c1, c2}, setupTx)) + + // Commit setup + setupTx.Commit() + + msgType := structs.DeploymentAllocHealthRequestType + + req := &structs.ApplyDeploymentAllocHealthRequest{ + DeploymentAllocHealthRequest: structs.DeploymentAllocHealthRequest{ + DeploymentID: d.ID, + HealthyAllocationIDs: []string{c1.ID}, + UnhealthyAllocationIDs: []string{c2.ID}, + }, + DeploymentUpdate: &structs.DeploymentStatusUpdate{ + DeploymentID: d.ID, + }, + } + + require.NoError(t, s.UpdateDeploymentAllocHealth(msgType, 100, req)) + + events := WaitForEvents(t, s, 100, 1, 1*time.Second) + require.Len(t, events, 3) + + var allocEvents []structs.Event + var deploymentEvent []structs.Event + for _, e := range events { + if e.Topic == structs.TopicAlloc { + allocEvents = append(allocEvents, e) + } else if e.Topic == structs.TopicDeployment { + deploymentEvent = append(deploymentEvent, e) + } + } + + require.Len(t, allocEvents, 2) + for _, e := range allocEvents { + require.Equal(t, 100, int(e.Index)) + require.Equal(t, TypeDeploymentAllocHealth, e.Type) + require.Equal(t, structs.TopicAlloc, e.Topic) + } + + require.Len(t, deploymentEvent, 1) + for _, e := range deploymentEvent { + require.Equal(t, 100, int(e.Index)) + require.Equal(t, TypeDeploymentAllocHealth, e.Type) + require.Equal(t, structs.TopicDeployment, e.Topic) + require.Equal(t, d.ID, e.Key) + } +} + +func TestGenericEventsFromChanges_UpsertNodeEventsType(t *testing.T) { + t.Parallel() + s := TestStateStoreCfg(t, TestStateStorePublisher(t)) + defer s.StopEventBroker() + + // setup + n1 := mock.Node() + n2 := mock.Node() + + require.NoError(t, s.UpsertNode(10, n1)) + require.NoError(t, s.UpsertNode(12, n2)) + + msgType := structs.UpsertNodeEventsType + req := &structs.EmitNodeEventsRequest{ + NodeEvents: map[string][]*structs.NodeEvent{ + n1.ID: { + { + Message: "update", + }, + }, + n2.ID: { + { + Message: "update", + }, + }, + }, + } + + require.NoError(t, s.UpsertNodeEventsMsgType(msgType, 100, req.NodeEvents)) + events := WaitForEvents(t, s, 100, 1, 1*time.Second) + require.Len(t, events, 2) + + for _, e := range events { + require.Equal(t, structs.TopicNode, e.Topic) + require.Equal(t, TypeNodeEvent, e.Type) + event := e.Payload.(*NodeEvent) + require.Equal(t, "update", event.Node.Events[len(event.Node.Events)-1].Message) + } + +} + +func TestGenericEventsFromChanges_NodeUpdateStatusRequest(t *testing.T) { + t.Parallel() + s := TestStateStoreCfg(t, TestStateStorePublisher(t)) + defer s.StopEventBroker() + + // setup + n1 := mock.Node() + + require.NoError(t, s.UpsertNode(10, n1)) + + updated := time.Now() + msgType := structs.NodeUpdateStatusRequestType + req := &structs.NodeUpdateStatusRequest{ + NodeID: n1.ID, + Status: structs.NodeStatusDown, + UpdatedAt: updated.UnixNano(), + NodeEvent: &structs.NodeEvent{Message: "down"}, + } + + require.NoError(t, s.UpdateNodeStatus(msgType, 100, req.NodeID, req.Status, req.UpdatedAt, req.NodeEvent)) + events := WaitForEvents(t, s, 100, 1, 1*time.Second) + require.Len(t, events, 1) + + e := events[0] + require.Equal(t, structs.TopicNode, e.Topic) + require.Equal(t, TypeNodeEvent, e.Type) + event := e.Payload.(*NodeEvent) + require.Equal(t, "down", event.Node.Events[len(event.Node.Events)-1].Message) + require.Equal(t, structs.NodeStatusDown, event.Node.Status) +} + +func TestGenericEventsFromChanges_EvalUpdateRequestType(t *testing.T) { + t.Parallel() + s := TestStateStoreCfg(t, TestStateStorePublisher(t)) + defer s.StopEventBroker() + + // setup + e1 := mock.Eval() + + require.NoError(t, s.UpsertEvals(10, []*structs.Evaluation{e1})) + + e2 := mock.Eval() + e2.ID = e1.ID + e2.JobID = e1.JobID + e2.Status = structs.EvalStatusBlocked + + msgType := structs.EvalUpdateRequestType + req := &structs.EvalUpdateRequest{ + Evals: []*structs.Evaluation{e2}, + } + + require.NoError(t, s.UpsertEvalsMsgType(msgType, 100, req.Evals)) + + events := WaitForEvents(t, s, 100, 1, 1*time.Second) + require.Len(t, events, 1) + + e := events[0] + require.Equal(t, structs.TopicEval, e.Topic) + require.Equal(t, TypeEvalUpdated, e.Type) + event := e.Payload.(*EvalEvent) + require.Equal(t, "blocked", event.Eval.Status) +} + +func TestGenericEventsFromChanges_ApplyPlanResultsRequestType(t *testing.T) { + t.Parallel() + s := TestStateStoreCfg(t, TestStateStorePublisher(t)) + defer s.StopEventBroker() + + // setup + alloc := mock.Alloc() + alloc2 := mock.Alloc() + job := alloc.Job + alloc.Job = nil + alloc2.Job = nil + + d := mock.Deployment() + alloc.DeploymentID = d.ID + alloc2.DeploymentID = d.ID + + require.NoError(t, s.UpsertJob(9, job)) + + eval := mock.Eval() + eval.JobID = job.ID + + // Create an eval + require.NoError(t, s.UpsertEvals(10, []*structs.Evaluation{eval})) + + msgType := structs.ApplyPlanResultsRequestType + req := &structs.ApplyPlanResultsRequest{ + AllocUpdateRequest: structs.AllocUpdateRequest{ + Alloc: []*structs.Allocation{alloc, alloc2}, + Job: job, + }, + Deployment: d, + EvalID: eval.ID, + } + + require.NoError(t, s.UpsertPlanResults(msgType, 100, req)) + + events := WaitForEvents(t, s, 100, 1, 1*time.Second) + require.Len(t, events, 5) + + var allocs []structs.Event + var evals []structs.Event + var jobs []structs.Event + var deploys []structs.Event + for _, e := range events { + if e.Topic == structs.TopicAlloc { + allocs = append(allocs, e) + } else if e.Topic == structs.TopicEval { + evals = append(evals, e) + } else if e.Topic == structs.TopicJob { + jobs = append(jobs, e) + } else if e.Topic == structs.TopicDeployment { + deploys = append(deploys, e) + } + require.Equal(t, TypePlanResult, e.Type) + } + require.Len(t, allocs, 2) + require.Len(t, evals, 1) + require.Len(t, jobs, 1) + require.Len(t, deploys, 1) +} + +func TestGenericEventsFromChanges_BatchNodeUpdateDrainRequestType(t *testing.T) { + t.Parallel() + s := TestStateStoreCfg(t, TestStateStorePublisher(t)) + defer s.StopEventBroker() + + // setup + n1 := mock.Node() + n2 := mock.Node() + + require.NoError(t, s.UpsertNode(10, n1)) + require.NoError(t, s.UpsertNode(11, n2)) + + updated := time.Now() + msgType := structs.BatchNodeUpdateDrainRequestType + + expectedDrain := &structs.DrainStrategy{ + DrainSpec: structs.DrainSpec{ + Deadline: -1 * time.Second, + }, + } + event := &structs.NodeEvent{ + Message: "Drain strategy enabled", + Subsystem: structs.NodeEventSubsystemDrain, + Timestamp: time.Now(), + } + req := structs.BatchNodeUpdateDrainRequest{ + Updates: map[string]*structs.DrainUpdate{ + n1.ID: { + DrainStrategy: expectedDrain, + }, + n2.ID: { + DrainStrategy: expectedDrain, + }, + }, + NodeEvents: map[string]*structs.NodeEvent{ + n1.ID: event, + n2.ID: event, + }, + UpdatedAt: updated.UnixNano(), + } + + require.NoError(t, s.BatchUpdateNodeDrain(msgType, 100, req.UpdatedAt, req.Updates, req.NodeEvents)) + + events := WaitForEvents(t, s, 100, 1, 1*time.Second) + require.Len(t, events, 2) + + for _, e := range events { + require.Equal(t, 100, int(e.Index)) + require.Equal(t, TypeNodeDrain, e.Type) + require.Equal(t, structs.TopicNode, e.Topic) + ne := e.Payload.(*NodeEvent) + require.Equal(t, event.Message, ne.Node.Events[len(ne.Node.Events)-1].Message) + } +} + +func TestGenericEventsFromChanges_NodeUpdateEligibilityRequestType(t *testing.T) { + t.Parallel() + s := TestStateStoreCfg(t, TestStateStorePublisher(t)) + defer s.StopEventBroker() + + // setup + n1 := mock.Node() + + require.NoError(t, s.UpsertNode(10, n1)) + + msgType := structs.NodeUpdateEligibilityRequestType + + event := &structs.NodeEvent{ + Message: "Node marked as ineligible", + Subsystem: structs.NodeEventSubsystemCluster, + Timestamp: time.Now(), + } + + req := structs.NodeUpdateEligibilityRequest{ + NodeID: n1.ID, + NodeEvent: event, + Eligibility: structs.NodeSchedulingIneligible, + UpdatedAt: time.Now().UnixNano(), + } + + require.NoError(t, s.UpdateNodeEligibility(msgType, 100, req.NodeID, req.Eligibility, req.UpdatedAt, req.NodeEvent)) + + events := WaitForEvents(t, s, 100, 1, 1*time.Second) + require.Len(t, events, 1) + + for _, e := range events { + require.Equal(t, 100, int(e.Index)) + require.Equal(t, TypeNodeDrain, e.Type) + require.Equal(t, structs.TopicNode, e.Topic) + ne := e.Payload.(*NodeEvent) + require.Equal(t, event.Message, ne.Node.Events[len(ne.Node.Events)-1].Message) + require.Equal(t, structs.NodeSchedulingIneligible, ne.Node.SchedulingEligibility) + } +} + +func TestGenericEventsFromChanges_AllocUpdateDesiredTransitionRequestType(t *testing.T) { + t.Parallel() + s := TestStateStoreCfg(t, TestStateStorePublisher(t)) + defer s.StopEventBroker() + + alloc := mock.Alloc() + + require.Nil(t, s.UpsertJob(10, alloc.Job)) + require.Nil(t, s.UpsertAllocs(11, []*structs.Allocation{alloc})) + + msgType := structs.AllocUpdateDesiredTransitionRequestType + + eval := &structs.Evaluation{ + ID: uuid.Generate(), + Namespace: alloc.Namespace, + Priority: alloc.Job.Priority, + Type: alloc.Job.Type, + TriggeredBy: structs.EvalTriggerNodeDrain, + JobID: alloc.Job.ID, + JobModifyIndex: alloc.Job.ModifyIndex, + Status: structs.EvalStatusPending, + } + evals := []*structs.Evaluation{eval} + + req := &structs.AllocUpdateDesiredTransitionRequest{ + Allocs: map[string]*structs.DesiredTransition{ + alloc.ID: {Migrate: helper.BoolToPtr(true)}, + }, + Evals: evals, + } + + require.NoError(t, s.UpdateAllocsDesiredTransitions(msgType, 100, req.Allocs, req.Evals)) + + events := WaitForEvents(t, s, 100, 1, 1*time.Second) + require.Len(t, events, 2) + + var allocs []structs.Event + var evalEvents []structs.Event + for _, e := range events { + if e.Topic == structs.TopicEval { + evalEvents = append(evalEvents, e) + } else if e.Topic == structs.TopicAlloc { + allocs = append(allocs, e) + } else { + require.Fail(t, "unexpected event type") + } + + require.Equal(t, TypeAllocUpdateDesiredStatus, e.Type) + } + + require.Len(t, allocs, 1) + require.Len(t, evalEvents, 1) +} + +func TestGenericEventsFromChanges_JobBatchDeregisterRequestType(t *testing.T) { + // TODO Job batch deregister logic mostly occurs in the FSM + t.SkipNow() + +} +func TestGenericEventsFromChanges_AllocClientUpdateRequestType(t *testing.T) { + t.SkipNow() +} + +func TestGenericEventsFromChanges_AllocUpdateRequestType(t *testing.T) { + t.SkipNow() +} + +func TestGenericEventsFromChanges_JobDeregisterRequestType(t *testing.T) { + t.SkipNow() +} diff --git a/nomad/state/node_events.go b/nomad/state/node_events.go index 4742b1aa587..178262fb99d 100644 --- a/nomad/state/node_events.go +++ b/nomad/state/node_events.go @@ -19,7 +19,7 @@ func NodeDeregisterEventFromChanges(tx ReadTxn, changes Changes) (*structs.Event } event := structs.Event{ - Topic: TopicNode, + Topic: structs.TopicNode, Type: TypeNodeDeregistration, Index: changes.Index, Key: before.ID, @@ -66,7 +66,7 @@ func NodeDrainEventFromChanges(tx ReadTxn, changes Changes) (*structs.Events, er } event := structs.Event{ - Topic: TopicNode, + Topic: structs.TopicNode, Type: TypeNodeDrain, Index: changes.Index, Key: after.ID, diff --git a/nomad/state/node_events_test.go b/nomad/state/node_events_test.go index 86874814c36..2789ae80f33 100644 --- a/nomad/state/node_events_test.go +++ b/nomad/state/node_events_test.go @@ -21,13 +21,13 @@ func TestNodeEventsFromChanges(t *testing.T) { }{ { MsgType: structs.NodeRegisterRequestType, - WantTopic: TopicNode, + WantTopic: structs.TopicNode, Name: "node registered", Mutate: func(s *StateStore, tx *txn) error { return upsertNodeTxn(tx, tx.Index, testNode()) }, WantEvents: []structs.Event{{ - Topic: TopicNode, + Topic: structs.TopicNode, Type: TypeNodeRegistration, Key: testNodeID(), Index: 100, @@ -39,13 +39,13 @@ func TestNodeEventsFromChanges(t *testing.T) { }, { MsgType: structs.NodeRegisterRequestType, - WantTopic: TopicNode, + WantTopic: structs.TopicNode, Name: "node registered initializing", Mutate: func(s *StateStore, tx *txn) error { return upsertNodeTxn(tx, tx.Index, testNode(nodeNotReady)) }, WantEvents: []structs.Event{{ - Topic: TopicNode, + Topic: structs.TopicNode, Type: TypeNodeRegistration, Key: testNodeID(), Index: 100, @@ -57,7 +57,7 @@ func TestNodeEventsFromChanges(t *testing.T) { }, { MsgType: structs.NodeDeregisterRequestType, - WantTopic: TopicNode, + WantTopic: structs.TopicNode, Name: "node deregistered", Setup: func(s *StateStore, tx *txn) error { return upsertNodeTxn(tx, tx.Index, testNode()) @@ -66,7 +66,7 @@ func TestNodeEventsFromChanges(t *testing.T) { return deleteNodeTxn(tx, tx.Index, []string{testNodeID()}) }, WantEvents: []structs.Event{{ - Topic: TopicNode, + Topic: structs.TopicNode, Type: TypeNodeDeregistration, Key: testNodeID(), Index: 100, @@ -78,7 +78,7 @@ func TestNodeEventsFromChanges(t *testing.T) { }, { MsgType: structs.NodeDeregisterRequestType, - WantTopic: TopicNode, + WantTopic: structs.TopicNode, Name: "batch node deregistered", Setup: func(s *StateStore, tx *txn) error { require.NoError(t, upsertNodeTxn(tx, tx.Index, testNode())) @@ -89,7 +89,7 @@ func TestNodeEventsFromChanges(t *testing.T) { }, WantEvents: []structs.Event{ { - Topic: TopicNode, + Topic: structs.TopicNode, Type: TypeNodeDeregistration, Key: testNodeID(), Index: 100, @@ -98,7 +98,7 @@ func TestNodeEventsFromChanges(t *testing.T) { }, }, { - Topic: TopicNode, + Topic: structs.TopicNode, Type: TypeNodeDeregistration, Key: testNodeIDTwo(), Index: 100, @@ -111,7 +111,7 @@ func TestNodeEventsFromChanges(t *testing.T) { }, { MsgType: structs.UpsertNodeEventsType, - WantTopic: TopicNode, + WantTopic: structs.TopicNode, Name: "batch node events upserted", Setup: func(s *StateStore, tx *txn) error { require.NoError(t, upsertNodeTxn(tx, tx.Index, testNode())) @@ -141,7 +141,7 @@ func TestNodeEventsFromChanges(t *testing.T) { }, WantEvents: []structs.Event{ { - Topic: TopicNode, + Topic: structs.TopicNode, Type: TypeNodeEvent, Key: testNodeID(), Index: 100, @@ -150,7 +150,7 @@ func TestNodeEventsFromChanges(t *testing.T) { }, }, { - Topic: TopicNode, + Topic: structs.TopicNode, Type: TypeNodeEvent, Key: testNodeIDTwo(), Index: 100, @@ -166,7 +166,7 @@ func TestNodeEventsFromChanges(t *testing.T) { for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { s := TestStateStoreCfg(t, TestStateStorePublisher(t)) - defer s.StopEventPublisher() + defer s.StopEventBroker() if tc.Setup != nil { // Bypass publish mechanism for setup @@ -215,7 +215,7 @@ func TestNodeEventsFromChanges(t *testing.T) { func TestNodeDrainEventFromChanges(t *testing.T) { t.Parallel() s := TestStateStoreCfg(t, TestStateStorePublisher(t)) - defer s.StopEventPublisher() + defer s.StopEventBroker() // setup setupTx := s.db.WriteTxn(10) @@ -251,7 +251,7 @@ func TestNodeDrainEventFromChanges(t *testing.T) { require.Len(t, got.Events, 1) - require.Equal(t, TopicNode, got.Events[0].Topic) + require.Equal(t, structs.TopicNode, got.Events[0].Topic) require.Equal(t, TypeNodeDrain, got.Events[0].Type) require.Equal(t, uint64(100), got.Events[0].Index) @@ -296,10 +296,6 @@ func nodeNotReady(n *structs.Node) { n.Status = structs.NodeStatusInit } -func nodeReady(n *structs.Node) { - n.Status = structs.NodeStatusReady -} - func nodeIDTwo(n *structs.Node) { n.ID = testNodeIDTwo() } diff --git a/nomad/state/state_changes.go b/nomad/state/state_changes.go index c94545d30d8..c7a2d9634e3 100644 --- a/nomad/state/state_changes.go +++ b/nomad/state/state_changes.go @@ -27,29 +27,20 @@ type Changes struct { // changeTrackerDB is a thin wrapper around memdb.DB which enables TrackChanges on // all write transactions. When the transaction is committed the changes are -// sent to the EventPublisher which will create and emit change events. +// sent to the EventBroker which will create and emit change events. type changeTrackerDB struct { - db *memdb.MemDB + memdb *memdb.MemDB durableCount int64 - publisher *stream.EventPublisher + publisher *stream.EventBroker processChanges func(ReadTxn, Changes) (*structs.Events, error) } -// ChangeConfig -type ChangeConfig struct { - DurableEventCount int -} - -func NewChangeTrackerDB(db *memdb.MemDB, publisher *stream.EventPublisher, changesFn changeProcessor, cfg *ChangeConfig) *changeTrackerDB { - if cfg == nil { - cfg = &ChangeConfig{} - } - +func NewChangeTrackerDB(db *memdb.MemDB, publisher *stream.EventBroker, changesFn changeProcessor, durableCount int64) *changeTrackerDB { return &changeTrackerDB{ - db: db, + memdb: db, publisher: publisher, processChanges: changesFn, - durableCount: int64(cfg.DurableEventCount), + durableCount: durableCount, } } @@ -63,7 +54,7 @@ func noOpProcessChanges(ReadTxn, Changes) (*structs.Events, error) { return nil, // TODO: this could return a regular memdb.Txn if all the state functions accepted // the ReadTxn interface func (c *changeTrackerDB) ReadTxn() *txn { - return &txn{Txn: c.db.Txn(false)} + return &txn{Txn: c.memdb.Txn(false)} } // WriteTxn returns a wrapped memdb.Txn suitable for writes to the state store. @@ -78,7 +69,7 @@ func (c *changeTrackerDB) ReadTxn() *txn { // data directly into the DB. These cases may use WriteTxnRestore. func (c *changeTrackerDB) WriteTxn(idx uint64) *txn { t := &txn{ - Txn: c.db.Txn(true), + Txn: c.memdb.Txn(true), Index: idx, publish: c.publish, } @@ -91,7 +82,7 @@ func (c *changeTrackerDB) WriteTxnMsgT(msgType structs.MessageType, idx uint64) t := &txn{ msgType: msgType, - Txn: c.db.Txn(true), + Txn: c.memdb.Txn(true), Index: idx, publish: c.publish, persistChanges: persistChanges, @@ -101,7 +92,7 @@ func (c *changeTrackerDB) WriteTxnMsgT(msgType structs.MessageType, idx uint64) } func (c *changeTrackerDB) publish(changes Changes) (*structs.Events, error) { - readOnlyTx := c.db.Txn(false) + readOnlyTx := c.memdb.Txn(false) defer readOnlyTx.Abort() events, err := c.processChanges(readOnlyTx, changes) @@ -123,12 +114,12 @@ func (c *changeTrackerDB) publish(changes Changes) (*structs.Events, error) { // written across many indexes. func (c *changeTrackerDB) WriteTxnRestore() *txn { return &txn{ - Txn: c.db.Txn(true), + Txn: c.memdb.Txn(true), Index: 0, } } -// txn wraps a memdb.Txn to capture changes and send them to the EventPublisher. +// txn wraps a memdb.Txn to capture changes and send them to the EventBroker. // // This can not be done with txn.Defer because the callback passed to Defer is // invoked after commit completes, and because the callback can not return an @@ -149,7 +140,7 @@ type txn struct { publish func(changes Changes) (*structs.Events, error) } -// Commit first pushes changes to EventPublisher, then calls Commit on the +// Commit first pushes changes to EventBroker, then calls Commit on the // underlying transaction. // // Note that this function, unlike memdb.Txn, returns an error which must be checked @@ -195,51 +186,11 @@ func processDBChanges(tx ReadTxn, changes Changes) (*structs.Events, error) { case structs.IgnoreUnknownTypeFlag: // unknown event type return nil, nil - case structs.NodeRegisterRequestType: - return GenericEventsFromChanges(tx, changes) - case structs.NodeUpdateStatusRequestType: - // TODO(drew) test - return GenericEventsFromChanges(tx, changes) case structs.NodeDeregisterRequestType: return NodeDeregisterEventFromChanges(tx, changes) case structs.NodeUpdateDrainRequestType: return NodeDrainEventFromChanges(tx, changes) - case structs.UpsertNodeEventsType: - return GenericEventsFromChanges(tx, changes) - case structs.DeploymentStatusUpdateRequestType: - return GenericEventsFromChanges(tx, changes) - case structs.DeploymentPromoteRequestType: - return GenericEventsFromChanges(tx, changes) - case structs.DeploymentAllocHealthRequestType: - return GenericEventsFromChanges(tx, changes) - case structs.ApplyPlanResultsRequestType: - // TODO test - return GenericEventsFromChanges(tx, changes) - case structs.EvalUpdateRequestType: - return GenericEventsFromChanges(tx, changes) - case structs.AllocClientUpdateRequestType: - return GenericEventsFromChanges(tx, changes) - case structs.JobRegisterRequestType: - // TODO(drew) test - return GenericEventsFromChanges(tx, changes) - case structs.AllocUpdateRequestType: - // TODO(drew) test - return GenericEventsFromChanges(tx, changes) - case structs.JobDeregisterRequestType: - // TODO(drew) test / handle delete - return GenericEventsFromChanges(tx, changes) - case structs.JobBatchDeregisterRequestType: - // TODO(drew) test & handle delete - return GenericEventsFromChanges(tx, changes) - case structs.AllocUpdateDesiredTransitionRequestType: - // TODO(drew) drain - return GenericEventsFromChanges(tx, changes) - case structs.NodeUpdateEligibilityRequestType: - // TODO(drew) test, drain - return GenericEventsFromChanges(tx, changes) - case structs.BatchNodeUpdateDrainRequestType: - // TODO(drew) test, drain + default: return GenericEventsFromChanges(tx, changes) } - return nil, nil } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index b50c809308d..ebb3f0a661d 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -54,7 +54,7 @@ type StateStoreConfig struct { // DurableEventCount is used to determine if events from transaction changes // should be saved in go-memdb - DurableEventCount int + DurableEventCount int64 } // The StateStore is responsible for maintaining all the Nomad @@ -77,7 +77,7 @@ type StateStore struct { // TODO: refactor abondonCh to use a context so that both can use the same // cancel mechanism. - stopEventPublisher func() + stopEventBroker func() } // NewStateStore is used to create a new state store @@ -91,27 +91,22 @@ func NewStateStore(config *StateStoreConfig) (*StateStore, error) { // Create the state store ctx, cancel := context.WithCancel(context.TODO()) s := &StateStore{ - logger: config.Logger.Named("state_store"), - config: config, - abandonCh: make(chan struct{}), - stopEventPublisher: cancel, + logger: config.Logger.Named("state_store"), + config: config, + abandonCh: make(chan struct{}), + stopEventBroker: cancel, } if config.EnablePublisher { - cfg := &ChangeConfig{ - DurableEventCount: config.DurableEventCount, - } - // Create new event publisher using provided config - publisher := stream.NewEventPublisher(ctx, stream.EventPublisherCfg{ - EventBufferTTL: 1 * time.Hour, + broker := stream.NewEventBroker(ctx, stream.EventBrokerCfg{ EventBufferSize: config.EventBufferSize, Logger: config.Logger, - OnEvict: s.eventPublisherEvict, + OnEvict: s.eventBrokerEvict, }) - s.db = NewChangeTrackerDB(db, publisher, processDBChanges, cfg) + s.db = NewChangeTrackerDB(db, broker, processDBChanges, config.DurableEventCount) } else { - s.db = NewChangeTrackerDB(db, nil, noOpProcessChanges, nil) + s.db = NewChangeTrackerDB(db, nil, noOpProcessChanges, 0) } // Initialize the state store with required enterprise objects @@ -122,16 +117,16 @@ func NewStateStore(config *StateStoreConfig) (*StateStore, error) { return s, nil } -func (s *StateStore) EventPublisher() (*stream.EventPublisher, error) { +func (s *StateStore) EventBroker() (*stream.EventBroker, error) { if s.db.publisher == nil { - return nil, fmt.Errorf("EventPublisher not configured") + return nil, fmt.Errorf("EventBroker not configured") } return s.db.publisher, nil } -// eventPublisherEvict is used as a callback to delete an evicted events +// eventBrokerEvict is used as a callback to delete an evicted events // entry from go-memdb. -func (s *StateStore) eventPublisherEvict(events *structs.Events) { +func (s *StateStore) eventBrokerEvict(events *structs.Events) { if err := s.deleteEvent(events); err != nil { if err == memdb.ErrNotFound { s.logger.Info("Evicted event was not found in go-memdb table", "event index", events.Index) @@ -142,7 +137,7 @@ func (s *StateStore) eventPublisherEvict(events *structs.Events) { } func (s *StateStore) deleteEvent(events *structs.Events) error { - txn := s.db.db.Txn(true) + txn := s.db.memdb.Txn(true) defer txn.Abort() if err := txn.Delete("events", events); err != nil { @@ -162,7 +157,7 @@ func (s *StateStore) Config() *StateStoreConfig { // we use MemDB, we just need to snapshot the state of the underlying // database. func (s *StateStore) Snapshot() (*StateSnapshot, error) { - memDBSnap := s.db.db.Snapshot() + memDBSnap := s.db.memdb.Snapshot() store := StateStore{ logger: s.logger, @@ -170,7 +165,7 @@ func (s *StateStore) Snapshot() (*StateSnapshot, error) { } // Create a new change tracker DB that does not publish or track changes - store.db = NewChangeTrackerDB(memDBSnap, nil, noOpProcessChanges, nil) + store.db = NewChangeTrackerDB(memDBSnap, nil, noOpProcessChanges, 0) snap := &StateSnapshot{ StateStore: store, @@ -253,14 +248,14 @@ func (s *StateStore) AbandonCh() <-chan struct{} { // Abandon is used to signal that the given state store has been abandoned. // Calling this more than one time will panic. func (s *StateStore) Abandon() { - s.StopEventPublisher() + s.StopEventBroker() close(s.abandonCh) } -// StopStopEventPublisher calls the cancel func for the state stores event +// StopStopEventBroker calls the cancel func for the state stores event // publisher. It should be called during server shutdown. -func (s *StateStore) StopEventPublisher() { - s.stopEventPublisher() +func (s *StateStore) StopEventBroker() { + s.stopEventBroker() } // QueryFn is the definition of a function that can be used to implement a basic @@ -340,7 +335,7 @@ func (s *StateStore) UpsertPlanResults(msgType structs.MessageType, index uint64 } } - // Update the status of dmsgType structs.MessageType by the plan. + // Update the status of deployments effected by the plan. if len(results.DeploymentUpdates) != 0 { s.upsertDeploymentUpdates(index, results.DeploymentUpdates, txn) } @@ -790,6 +785,7 @@ func (s *StateStore) ScalingEventsByJob(ws memdb.WatchSet, namespace, jobID stri // UpsertNodeMsgType is used to register a node or update a node definition // This is assumed to be triggered by the client, so we retain the value // of drain/eligibility which is set by the scheduler. +// TODO(drew) remove this and update all test callers of UpsertNode to use msgType func (s *StateStore) UpsertNodeMsgType(msgType structs.MessageType, index uint64, node *structs.Node) error { txn := s.db.WriteTxnMsgT(msgType, index) defer txn.Abort() @@ -972,7 +968,7 @@ func (s *StateStore) updateNodeStatusTxn(txn *txn, nodeID, status string, update // BatchUpdateNodeDrain is used to update the drain of a node set of nodes func (s *StateStore) BatchUpdateNodeDrain(msgType structs.MessageType, index uint64, updatedAt int64, updates map[string]*structs.DrainUpdate, events map[string]*structs.NodeEvent) error { - txn := s.db.WriteTxn(index) + txn := s.db.WriteTxnMsgT(msgType, index) defer txn.Abort() for node, update := range updates { if err := s.updateNodeDrainImpl(txn, index, node, update.DrainStrategy, update.MarkEligible, updatedAt, events[node]); err != nil { @@ -5900,8 +5896,8 @@ func (s *StateRestore) Abort() { } // Commit is used to commit the restore operation -func (s *StateRestore) Commit() { - s.txn.Commit() +func (s *StateRestore) Commit() error { + return s.txn.Commit() } // NodeRestore is used to restore a node diff --git a/nomad/state/state_store_events_test.go b/nomad/state/state_store_events_test.go index 08a5b88b770..146b792e972 100644 --- a/nomad/state/state_store_events_test.go +++ b/nomad/state/state_store_events_test.go @@ -27,7 +27,7 @@ func TestStateStore_Events_OnEvict(t *testing.T) { } s := TestStateStoreCfg(t, cfg) - _, err := s.EventPublisher() + _, err := s.EventBroker() require.NoError(t, err) // force 3 evictions @@ -85,7 +85,7 @@ func TestStateStore_Events_OnEvict_Missing(t *testing.T) { } s := TestStateStoreCfg(t, cfg) - _, err := s.EventPublisher() + _, err := s.EventBroker() require.NoError(t, err) getEvents := func() []*structs.Events { diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 1c0f345f57e..7fed591a4a4 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -1368,7 +1368,7 @@ func TestStateStore_RestoreNode(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - restore.Commit() + require.NoError(t, restore.Commit()) ws := memdb.NewWatchSet() out, err := state.NodeByID(ws, node.ID) @@ -2441,7 +2441,7 @@ func TestStateStore_RestoreJob(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - restore.Commit() + require.NoError(t, restore.Commit()) ws := memdb.NewWatchSet() out, err := state.JobByID(ws, job.Namespace, job.ID) @@ -2711,7 +2711,7 @@ func TestStateStore_RestorePeriodicLaunch(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - restore.Commit() + require.NoError(t, restore.Commit()) ws := memdb.NewWatchSet() out, err := state.PeriodicLaunchByID(ws, job.Namespace, job.ID) @@ -2743,7 +2743,7 @@ func TestStateStore_RestoreJobVersion(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - restore.Commit() + require.NoError(t, restore.Commit()) ws := memdb.NewWatchSet() out, err := state.JobByIDAndVersion(ws, job.Namespace, job.ID, job.Version) @@ -2775,7 +2775,7 @@ func TestStateStore_RestoreDeployment(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - restore.Commit() + require.NoError(t, restore.Commit()) ws := memdb.NewWatchSet() out, err := state.DeploymentByID(ws, d.ID) @@ -2815,7 +2815,7 @@ func TestStateStore_RestoreJobSummary(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - restore.Commit() + require.NoError(t, restore.Commit()) ws := memdb.NewWatchSet() out, err := state.JobSummaryByID(ws, job.Namespace, job.ID) @@ -3617,7 +3617,7 @@ func TestStateStore_RestoreCSIPlugin(t *testing.T) { err = restore.CSIPluginRestore(plugin) require.NoError(err) - restore.Commit() + require.NoError(restore.Commit()) ws := memdb.NewWatchSet() out, err := state.CSIPluginByID(ws, plugin.ID) @@ -3731,7 +3731,7 @@ func TestStateStore_RestoreIndex(t *testing.T) { t.Fatalf("err: %v", err) } - restore.Commit() + require.NoError(t, restore.Commit()) out, err := state.Index("jobs") if err != nil { @@ -4397,7 +4397,7 @@ func TestStateStore_RestoreEval(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - restore.Commit() + require.NoError(t, restore.Commit()) ws := memdb.NewWatchSet() out, err := state.EvalByID(ws, eval.ID) @@ -6033,7 +6033,7 @@ func TestStateStore_RestoreAlloc(t *testing.T) { t.Fatalf("err: %v", err) } - restore.Commit() + require.NoError(t, restore.Commit()) ws := memdb.NewWatchSet() out, err := state.AllocByID(ws, alloc.ID) @@ -7701,7 +7701,7 @@ func TestStateStore_RestoreVaultAccessor(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - restore.Commit() + require.NoError(t, restore.Commit()) ws := memdb.NewWatchSet() out, err := state.VaultAccessor(ws, a.Accessor) @@ -7902,7 +7902,7 @@ func TestStateStore_RestoreSITokenAccessor(t *testing.T) { err = restore.SITokenAccessorRestore(a1) r.NoError(err) - restore.Commit() + require.NoError(t, restore.Commit()) ws := memdb.NewWatchSet() result, err := state.SITokenAccessor(ws, a1.AccessorID) @@ -8382,7 +8382,7 @@ func TestStateStore_RestoreACLPolicy(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - restore.Commit() + require.NoError(t, restore.Commit()) ws := memdb.NewWatchSet() out, err := state.ACLPolicyByName(ws, policy.Name) @@ -8441,7 +8441,7 @@ func TestStateStore_RestoreACLToken(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - restore.Commit() + require.NoError(t, restore.Commit()) ws := memdb.NewWatchSet() out, err := state.ACLTokenByAccessorID(ws, token.AccessorID) @@ -8470,7 +8470,7 @@ func TestStateStore_SchedulerConfig(t *testing.T) { err = restore.SchedulerConfigRestore(schedConfig) require.Nil(err) - restore.Commit() + require.NoError(restore.Commit()) modIndex, out, err := state.SchedulerConfig() require.Nil(err) @@ -8510,7 +8510,7 @@ func TestStateStore_ClusterMetadataRestore(t *testing.T) { err = restore.ClusterMetadataRestore(meta) require.NoError(err) - restore.Commit() + require.NoError(restore.Commit()) out, err := state.ClusterMetadata(nil) require.NoError(err) @@ -8530,7 +8530,7 @@ func TestStateStore_RestoreScalingPolicy(t *testing.T) { err = restore.ScalingPolicyRestore(scalingPolicy) require.NoError(err) - restore.Commit() + require.NoError(restore.Commit()) ws := memdb.NewWatchSet() out, err := state.ScalingPolicyByID(ws, scalingPolicy.ID) @@ -9543,7 +9543,7 @@ func TestStateStore_RestoreScalingEvents(t *testing.T) { err = restore.ScalingEventsRestore(jobScalingEvents) require.NoError(err) - restore.Commit() + require.NoError(restore.Commit()) ws := memdb.NewWatchSet() out, _, err := state.ScalingEventsByJob(ws, jobScalingEvents.Namespace, diff --git a/nomad/stream/event_buffer.go b/nomad/stream/event_buffer.go index 42c9dc3a4c2..419b27f9837 100644 --- a/nomad/stream/event_buffer.go +++ b/nomad/stream/event_buffer.go @@ -50,19 +50,17 @@ type eventBuffer struct { head atomic.Value tail atomic.Value - maxSize int64 - maxItemTTL time.Duration - onEvict EvictCallbackFn + maxSize int64 + onEvict EvictCallbackFn } // newEventBuffer creates an eventBuffer ready for use. -func newEventBuffer(size int64, maxItemTTL time.Duration, onEvict EvictCallbackFn) *eventBuffer { +func newEventBuffer(size int64, onEvict EvictCallbackFn) *eventBuffer { zero := int64(0) b := &eventBuffer{ - maxSize: size, - size: &zero, - maxItemTTL: maxItemTTL, - onEvict: onEvict, + maxSize: size, + size: &zero, + onEvict: onEvict, } item := newBufferItem(&structs.Events{Index: 0, Events: nil}) @@ -77,7 +75,7 @@ func newEventBuffer(size int64, maxItemTTL time.Duration, onEvict EvictCallbackF // watchers. After calling append, the caller must not make any further // mutations to the events as they may have been exposed to subscribers in other // goroutines. Append only supports a single concurrent caller and must be -// externally synchronized with other Append, or prune calls. +// externally synchronized with other Append calls. func (b *eventBuffer) Append(events *structs.Events) { b.appendItem(newBufferItem(events)) } @@ -104,7 +102,7 @@ func (b *eventBuffer) appendItem(item *bufferItem) { } func newSentinelItem() *bufferItem { - return newBufferItem(&structs.Events{Index: 0, Events: nil}) + return newBufferItem(&structs.Events{}) } // advanceHead drops the current Head buffer item and notifies readers @@ -191,25 +189,6 @@ func (b *eventBuffer) Len() int { return int(atomic.LoadInt64(b.size)) } -// prune advances the head of the buffer until the head buffer item TTL -// is no longer expired. It should be externally synchronized as it mutates -// the buffer of items. -func (b *eventBuffer) prune() { - now := time.Now() - for { - head := b.Head() - if b.Len() == 0 { - return - } - - if now.Sub(head.createdAt) > b.maxItemTTL { - b.advanceHead() - } else { - return - } - } -} - // bufferItem represents a set of events published by a single raft operation. // The first item returned by a newly constructed buffer will have nil Events. // It is a sentinel value which is used to wait on the next events via Next. diff --git a/nomad/stream/event_buffer_test.go b/nomad/stream/event_buffer_test.go index 84f0b852473..45501d12372 100644 --- a/nomad/stream/event_buffer_test.go +++ b/nomad/stream/event_buffer_test.go @@ -17,7 +17,7 @@ func TestEventBufferFuzz(t *testing.T) { nReaders := 1000 nMessages := 1000 - b := newEventBuffer(1000, DefaultTTL, nil) + b := newEventBuffer(1000, nil) // Start a write goroutine that will publish 10000 messages with sequential // indexes and some jitter in timing (to allow clients to "catch up" and block @@ -85,7 +85,7 @@ func TestEventBufferFuzz(t *testing.T) { } func TestEventBuffer_Slow_Reader(t *testing.T) { - b := newEventBuffer(10, DefaultTTL, nil) + b := newEventBuffer(10, nil) for i := 0; i < 10; i++ { e := structs.Event{ @@ -114,7 +114,7 @@ func TestEventBuffer_Slow_Reader(t *testing.T) { } func TestEventBuffer_Size(t *testing.T) { - b := newEventBuffer(100, DefaultTTL, nil) + b := newEventBuffer(100, nil) for i := 0; i < 10; i++ { e := structs.Event{ @@ -126,11 +126,11 @@ func TestEventBuffer_Size(t *testing.T) { require.Equal(t, 10, b.Len()) } -// TestEventBuffer_Prune_AllOld tests the behavior when all items -// are past their TTL, the event buffer should prune down to the last message -// and hold onto the last item. -func TestEventBuffer_Prune_AllOld(t *testing.T) { - b := newEventBuffer(100, 1*time.Second, nil) +// TestEventBuffer_Emptying_Buffer tests the behavior when all items +// are removed, the event buffer should advance its head down to the last message +// and insert a placeholder sentinel value. +func TestEventBuffer_Emptying_Buffer(t *testing.T) { + b := newEventBuffer(10, nil) for i := 0; i < 10; i++ { e := structs.Event{ @@ -141,11 +141,11 @@ func TestEventBuffer_Prune_AllOld(t *testing.T) { require.Equal(t, 10, int(b.Len())) - time.Sleep(1 * time.Second) - - // prune old messages, which will bring the event buffer down + // empty the buffer, which will bring the event buffer down // to a single sentinel value - b.prune() + for i := 0; i < 11; i++ { + b.advanceHead() + } // head and tail are now a sentinel value head := b.Head() @@ -203,7 +203,7 @@ func TestEventBuffer_StartAt_CurrentIdx_Past_Start(t *testing.T) { } // buffer starts at index 11 goes to 100 - b := newEventBuffer(100, 1*time.Hour, nil) + b := newEventBuffer(100, nil) for i := 11; i <= 100; i++ { e := structs.Event{ @@ -226,7 +226,7 @@ func TestEventBuffer_OnEvict(t *testing.T) { testOnEvict := func(events *structs.Events) { close(called) } - b := newEventBuffer(2, DefaultTTL, testOnEvict) + b := newEventBuffer(2, testOnEvict) // start at 1 since new event buffer is built with a starting sentinel value for i := 1; i < 4; i++ { diff --git a/nomad/stream/event_publisher.go b/nomad/stream/event_publisher.go index f28131dddec..bcf8a8fc52b 100644 --- a/nomad/stream/event_publisher.go +++ b/nomad/stream/event_publisher.go @@ -2,9 +2,11 @@ package stream import ( "context" + "fmt" "sync" "time" + "github.com/armon/go-metrics" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/go-hclog" @@ -14,48 +16,34 @@ const ( DefaultTTL = 1 * time.Hour ) -type EventPublisherCfg struct { +type EventBrokerCfg struct { EventBufferSize int64 - EventBufferTTL time.Duration Logger hclog.Logger OnEvict EvictCallbackFn } -type EventPublisher struct { - // lock protects the eventbuffer - lock sync.Mutex +type EventBroker struct { + // mu protects the eventbuffer and subscriptions + mu sync.Mutex // eventBuf stores a configurable amount of events in memory eventBuf *eventBuffer - logger hclog.Logger - subscriptions *subscriptions // publishCh is used to send messages from an active txn to a goroutine which // publishes events, so that publishing can happen asynchronously from // the Commit call in the FSM hot path. publishCh chan *structs.Events -} - -type subscriptions struct { - // lock for byToken. If both subscription.lock and EventPublisher.lock need - // to be held, EventPublisher.lock MUST always be acquired first. - lock sync.RWMutex - // byToken is an mapping of active Subscriptions indexed by a token and - // a pointer to the request. - // When the token is modified all subscriptions under that token will be - // reloaded. - // A subscription may be unsubscribed by using the pointer to the request. - byToken map[string]map[*SubscribeRequest]*Subscription + logger hclog.Logger } -func NewEventPublisher(ctx context.Context, cfg EventPublisherCfg) *EventPublisher { - if cfg.EventBufferTTL == 0 { - cfg.EventBufferTTL = 1 * time.Hour - } - +// NewEventBroker returns an EventBroker for publishing change events. +// A goroutine is run in the background to publish events to an event buffer. +// Cancelling the context will shutdown the goroutine to free resources, and stop +// all publishing. +func NewEventBroker(ctx context.Context, cfg EventBrokerCfg) *EventBroker { if cfg.Logger == nil { cfg.Logger = hclog.NewNullLogger() } @@ -65,9 +53,9 @@ func NewEventPublisher(ctx context.Context, cfg EventPublisherCfg) *EventPublish cfg.EventBufferSize = 100 } - buffer := newEventBuffer(cfg.EventBufferSize, cfg.EventBufferTTL, cfg.OnEvict) - e := &EventPublisher{ - logger: cfg.Logger.Named("event_publisher"), + buffer := newEventBuffer(cfg.EventBufferSize, cfg.OnEvict) + e := &EventBroker{ + logger: cfg.Logger.Named("event_broker"), eventBuf: buffer, publishCh: make(chan *structs.Events, 64), subscriptions: &subscriptions{ @@ -80,21 +68,35 @@ func NewEventPublisher(ctx context.Context, cfg EventPublisherCfg) *EventPublish return e } -func (e *EventPublisher) Len() int { +// Returns the current length of the event buffer +func (e *EventBroker) Len() int { + e.mu.Lock() + defer e.mu.Unlock() return e.eventBuf.Len() } // Publish events to all subscribers of the event Topic. -func (e *EventPublisher) Publish(events *structs.Events) { +func (e *EventBroker) Publish(events *structs.Events) { if len(events.Events) > 0 { e.publishCh <- events } } -// Subscribe returns a new Subscription for a given request. -func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error) { - e.lock.Lock() - defer e.lock.Unlock() +// Subscribe returns a new Subscription for a given request. A Subscription +// will receive an initial empty currentItem value which points to the first item +// in the buffer. This allows the new subscription to call Next() without first checking +// for the current Item. +// +// A Subscription will start at the requested index, or as close as possible to +// the requested index if it is no longer in the buffer. If StartExactlyAtIndex is +// set and the index is no longer in the buffer or not yet in the buffer an error +// will be returned. +// +// When a caller is finished with the subscription it must call Subscription.Unsubscribe +// to free ACL tracking resources. TODO(drew) ACL tracking +func (e *EventBroker) Subscribe(req *SubscribeRequest) (*Subscription, error) { + e.mu.Lock() + defer e.mu.Unlock() var head *bufferItem var offset int @@ -103,8 +105,11 @@ func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error) } else { head = e.eventBuf.Head() } - if offset > 0 { - e.logger.Warn("requested index no longer in buffer", "requsted", int(req.Index), "closest", int(head.Events.Index)) + if offset > 0 && req.StartExactlyAtIndex { + return nil, fmt.Errorf("requested index not in buffer") + } else if offset > 0 { + metrics.SetGauge([]string{"nomad", "event_broker", "subscription", "request_offset"}, float32(offset)) + e.logger.Debug("requested index no longer in buffer", "requsted", int(req.Index), "closest", int(head.Events.Index)) } // Empty head so that calling Next on sub @@ -112,17 +117,18 @@ func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error) start.link.next.Store(head) close(start.link.ch) - sub := newSubscription(req, start, e.subscriptions.unsubscribe(req)) + sub := newSubscription(req, start, e.subscriptions.unsubscribeFn(req)) e.subscriptions.add(req, sub) return sub, nil } -func (e *EventPublisher) CloseAll() { +// CloseAll closes all subscriptions +func (e *EventBroker) CloseAll() { e.subscriptions.closeAll() } -func (e *EventPublisher) handleUpdates(ctx context.Context) { +func (e *EventBroker) handleUpdates(ctx context.Context) { for { select { case <-ctx.Done(): @@ -135,16 +141,29 @@ func (e *EventPublisher) handleUpdates(ctx context.Context) { } // sendEvents sends the given events to the publishers event buffer. -func (e *EventPublisher) sendEvents(update *structs.Events) { - e.lock.Lock() - defer e.lock.Unlock() +func (e *EventBroker) sendEvents(update *structs.Events) { + e.mu.Lock() + defer e.mu.Unlock() e.eventBuf.Append(update) } +type subscriptions struct { + // mu for byToken. If both subscription.mu and EventBroker.mu need + // to be held, EventBroker mutex MUST always be acquired first. + mu sync.RWMutex + + // byToken is an mapping of active Subscriptions indexed by a token and + // a pointer to the request. + // When the token is modified all subscriptions under that token will be + // reloaded. + // A subscription may be unsubscribed by using the pointer to the request. + byToken map[string]map[*SubscribeRequest]*Subscription +} + func (s *subscriptions) add(req *SubscribeRequest, sub *Subscription) { - s.lock.Lock() - defer s.lock.Unlock() + s.mu.Lock() + defer s.mu.Unlock() subsByToken, ok := s.byToken[req.Token] if !ok { @@ -155,8 +174,8 @@ func (s *subscriptions) add(req *SubscribeRequest, sub *Subscription) { } func (s *subscriptions) closeSubscriptionsForTokens(tokenSecretIDs []string) { - s.lock.RLock() - defer s.lock.RUnlock() + s.mu.RLock() + defer s.mu.RUnlock() for _, secretID := range tokenSecretIDs { if subs, ok := s.byToken[secretID]; ok { @@ -167,20 +186,29 @@ func (s *subscriptions) closeSubscriptionsForTokens(tokenSecretIDs []string) { } } -// unsubscribe returns a function that the subscription will call to remove +// unsubscribeFn returns a function that the subscription will call to remove // itself from the subsByToken. // This function is returned as a closure so that the caller doesn't need to keep -// track of the SubscriptionRequest, and can not accidentally call unsubscribe with the +// track of the SubscriptionRequest, and can not accidentally call unsubscribeFn with the // wrong pointer. -func (s *subscriptions) unsubscribe(req *SubscribeRequest) func() { +func (s *subscriptions) unsubscribeFn(req *SubscribeRequest) func() { return func() { - s.lock.Lock() - defer s.lock.Unlock() + s.mu.Lock() + defer s.mu.Unlock() subsByToken, ok := s.byToken[req.Token] if !ok { return } + + sub := subsByToken[req] + if sub == nil { + return + } + + // close the subscription + sub.forceClose() + delete(subsByToken, req) if len(subsByToken) == 0 { delete(s.byToken, req.Token) @@ -189,8 +217,8 @@ func (s *subscriptions) unsubscribe(req *SubscribeRequest) func() { } func (s *subscriptions) closeAll() { - s.lock.Lock() - defer s.lock.Unlock() + s.mu.Lock() + defer s.mu.Unlock() for _, byRequest := range s.byToken { for _, sub := range byRequest { diff --git a/nomad/stream/event_publisher_test.go b/nomad/stream/event_publisher_test.go index af92d059566..0587a81f520 100644 --- a/nomad/stream/event_publisher_test.go +++ b/nomad/stream/event_publisher_test.go @@ -2,6 +2,7 @@ package stream import ( "context" + "sync/atomic" "testing" "time" @@ -10,16 +11,16 @@ import ( "github.com/stretchr/testify/require" ) -func TestEventPublisher_PublishChangesAndSubscribe(t *testing.T) { +func TestEventBroker_PublishChangesAndSubscribe(t *testing.T) { subscription := &SubscribeRequest{ Topics: map[structs.Topic][]string{ - "Test": []string{"sub-key"}, + "Test": {"sub-key"}, }, } ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - publisher := NewEventPublisher(ctx, EventPublisherCfg{EventBufferSize: 100, EventBufferTTL: DefaultTTL}) + publisher := NewEventBroker(ctx, EventBrokerCfg{EventBufferSize: 100}) sub, err := publisher.Subscribe(subscription) require.NoError(t, err) eventCh := consumeSubscription(ctx, sub) @@ -59,11 +60,11 @@ func TestEventPublisher_PublishChangesAndSubscribe(t *testing.T) { require.Equal(t, expected, result.Events) } -func TestEventPublisher_ShutdownClosesSubscriptions(t *testing.T) { +func TestEventBroker_ShutdownClosesSubscriptions(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - publisher := NewEventPublisher(ctx, EventPublisherCfg{}) + publisher := NewEventBroker(ctx, EventBrokerCfg{}) sub1, err := publisher.Subscribe(&SubscribeRequest{}) require.NoError(t, err) @@ -82,6 +83,32 @@ func TestEventPublisher_ShutdownClosesSubscriptions(t *testing.T) { require.Equal(t, err, ErrSubscriptionClosed) } +// TestEventBroker_EmptyReqToken_DistinctSubscriptions tests subscription +// hanlding behavior when ACLs are disabled (request Token is empty). +// Subscriptions are mapped by their request token. when that token is empty, +// the subscriptions should still be handled indeppendtly of each other when +// unssubscribing. +func TestEventBroker_EmptyReqToken_DistinctSubscriptions(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + publisher := NewEventBroker(ctx, EventBrokerCfg{}) + + // first subscription, empty token + sub1, err := publisher.Subscribe(&SubscribeRequest{}) + require.NoError(t, err) + defer sub1.Unsubscribe() + + // second subscription, empty token + sub2, err := publisher.Subscribe(&SubscribeRequest{}) + require.NoError(t, err) + require.NotNil(t, sub2) + + sub1.Unsubscribe() + + require.Equal(t, subscriptionStateOpen, atomic.LoadUint32(&sub2.state)) +} + func consumeSubscription(ctx context.Context, sub *Subscription) <-chan subNextResult { eventCh := make(chan subNextResult, 1) go func() { diff --git a/nomad/stream/ndjson.go b/nomad/stream/ndjson.go index cd3befc7ea1..7e7ad092810 100644 --- a/nomad/stream/ndjson.go +++ b/nomad/stream/ndjson.go @@ -1,81 +1,63 @@ package stream import ( - "bytes" "context" "encoding/json" "fmt" - "sync" "time" "github.com/hashicorp/nomad/nomad/structs" ) var ( - // NDJsonHeartbeat is the NDJson to send as a heartbeat + // JsonHeartbeat is an empty JSON object to send as a heartbeat // Avoids creating many heartbeat instances - NDJsonHeartbeat = &structs.NDJson{Data: []byte("{}\n")} + JsonHeartbeat = &structs.EventJson{Data: []byte("{}")} ) -// NDJsonStream is used to send new line delimited JSON and heartbeats +// JsonStream is used to send new line delimited JSON and heartbeats // to a destination (out channel) -type NDJsonStream struct { - out chan<- *structs.NDJson +type JsonStream struct { + // ctx is a passed in context used to notify the json stream + // when it should terminate + ctx context.Context + + outCh chan *structs.EventJson // heartbeat is the interval to send heartbeat messages to keep a connection // open. - heartbeat *time.Ticker - - publishCh chan structs.NDJson - exitCh chan struct{} - - l sync.Mutex - running bool + heartbeatTick *time.Ticker } -// NewNNewNDJsonStream creates a new NDJson stream that will output NDJson structs -// to the passed output channel -func NewNDJsonStream(out chan<- *structs.NDJson, heartbeat time.Duration) *NDJsonStream { - return &NDJsonStream{ - out: out, - heartbeat: time.NewTicker(heartbeat), - exitCh: make(chan struct{}), - publishCh: make(chan structs.NDJson), +// NewJsonStream creates a new json stream that will output Json structs +// to the passed output channel. The constructor starts a goroutine +// to begin heartbeating on its set interval. +func NewJsonStream(ctx context.Context, heartbeat time.Duration) *JsonStream { + s := &JsonStream{ + ctx: ctx, + outCh: make(chan *structs.EventJson, 10), + heartbeatTick: time.NewTicker(heartbeat), } -} -// Run starts a long lived goroutine that handles sending -// heartbeats and processed json objects to the streams out channel as well -func (n *NDJsonStream) Run(ctx context.Context) { - n.l.Lock() - if n.running { - return - } - n.running = true - n.l.Unlock() + go s.heartbeat() - go n.run(ctx) + return s } -func (n *NDJsonStream) run(ctx context.Context) { - defer func() { - n.l.Lock() - n.running = false - n.l.Unlock() - close(n.exitCh) - }() +func (n *JsonStream) OutCh() chan *structs.EventJson { + return n.outCh +} +func (n *JsonStream) heartbeat() { for { select { - case <-ctx.Done(): + case <-n.ctx.Done(): return - case msg := <-n.publishCh: - n.out <- msg.Copy() - case <-n.heartbeat.C: + case <-n.heartbeatTick.C: // Send a heartbeat frame select { - case n.out <- NDJsonHeartbeat: - case <-ctx.Done(): + case n.outCh <- JsonHeartbeat: + case <-n.ctx.Done(): return } } @@ -84,19 +66,20 @@ func (n *NDJsonStream) run(ctx context.Context) { // Send encodes an object into Newline delimited json. An error is returned // if json encoding fails or if the stream is no longer running. -func (n *NDJsonStream) Send(obj interface{}) error { - n.l.Lock() - defer n.l.Unlock() +func (n *JsonStream) Send(v interface{}) error { + if n.ctx.Err() != nil { + return n.ctx.Err() + } - buf := bytes.NewBuffer(nil) - if err := json.NewEncoder(buf).Encode(obj); err != nil { - return fmt.Errorf("marshaling json for stream: %w", err) + buf, err := json.Marshal(v) + if err != nil { + return fmt.Errorf("error marshaling json for stream: %w", err) } select { - case n.publishCh <- structs.NDJson{Data: buf.Bytes()}: - case <-n.exitCh: - return fmt.Errorf("stream is no longer running") + case <-n.ctx.Done(): + return fmt.Errorf("error stream is no longer running: %w", err) + case n.outCh <- &structs.EventJson{Data: buf}: } return nil diff --git a/nomad/stream/ndjson_test.go b/nomad/stream/ndjson_test.go index 589cde1a719..0c7c4de787d 100644 --- a/nomad/stream/ndjson_test.go +++ b/nomad/stream/ndjson_test.go @@ -6,7 +6,6 @@ import ( "testing" "time" - "github.com/hashicorp/nomad/nomad/structs" "github.com/stretchr/testify/require" ) @@ -14,15 +13,14 @@ type testObj struct { Name string `json:"name"` } -func TestNDJson(t *testing.T) { +func TestJsonStream(t *testing.T) { t.Parallel() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - out := make(chan *structs.NDJson) - s := NewNDJsonStream(out, 1*time.Second) - s.Run(ctx) + s := NewJsonStream(ctx, 1*time.Second) + out := s.OutCh() require.NoError(t, s.Send(testObj{Name: "test"})) @@ -30,25 +28,31 @@ func TestNDJson(t *testing.T) { var expected bytes.Buffer expected.Write([]byte(`{"name":"test"}`)) - expected.Write([]byte("\n")) require.Equal(t, expected.Bytes(), out1.Data) select { - case _ = <-out: - t.Fatalf("Did not expect another message") + case msg := <-out: + require.Failf(t, "Did not expect another message", "%#v", msg) case <-time.After(100 * time.Millisecond): } + + require.NoError(t, s.Send(testObj{Name: "test2"})) + + out2 := <-out + expected.Reset() + + expected.Write([]byte(`{"name":"test2"}`)) + require.Equal(t, expected.Bytes(), out2.Data) + } -func TestNDJson_Send_After_Stop(t *testing.T) { +func TestJson_Send_After_Stop(t *testing.T) { t.Parallel() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - out := make(chan *structs.NDJson) - s := NewNDJsonStream(out, 1*time.Second) - s.Run(ctx) + s := NewJsonStream(ctx, 1*time.Second) // stop the stream cancel() @@ -57,17 +61,16 @@ func TestNDJson_Send_After_Stop(t *testing.T) { require.Error(t, s.Send(testObj{})) } -func TestNDJson_HeartBeat(t *testing.T) { +func TestJson_HeartBeat(t *testing.T) { t.Parallel() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - out := make(chan *structs.NDJson) - s := NewNDJsonStream(out, 10*time.Millisecond) - s.Run(ctx) + s := NewJsonStream(ctx, 10*time.Millisecond) + out := s.OutCh() heartbeat := <-out - require.Equal(t, NDJsonHeartbeat, heartbeat) + require.Equal(t, JsonHeartbeat, heartbeat) } diff --git a/nomad/stream/subscription.go b/nomad/stream/subscription.go index b85838f3d9c..70b133a6382 100644 --- a/nomad/stream/subscription.go +++ b/nomad/stream/subscription.go @@ -9,7 +9,6 @@ import ( ) const ( - AllKeys = "*" // subscriptionStateOpen is the default state of a subscription. An open // subscription may receive new events. subscriptionStateOpen uint32 = 0 @@ -25,7 +24,7 @@ const ( var ErrSubscriptionClosed = errors.New("subscription closed by server, client should resubscribe") type Subscription struct { - // state is accessed atomically 0 means open, 1 means closed with reload + // state must be accessed atomically 0 means open, 1 means closed with reload state uint32 req *SubscribeRequest @@ -35,10 +34,10 @@ type Subscription struct { currentItem *bufferItem // forceClosed is closed when forceClose is called. It is used by - // EventPublisher to cancel Next(). + // EventBroker to cancel Next(). forceClosed chan struct{} - // unsub is a function set by EventPublisher that is called to free resources + // unsub is a function set by EventBroker that is called to free resources // when the subscription is no longer needed. // It must be safe to call the function from multiple goroutines and the function // must be idempotent. @@ -51,6 +50,12 @@ type SubscribeRequest struct { Namespace string Topics map[structs.Topic][]string + + // StartExactlyAtIndex specifies if a subscription needs to + // start exactly at the requested Index. If set to false, + // the closest index in the buffer will be returned if there is not + // an exact match + StartExactlyAtIndex bool } func newSubscription(req *SubscribeRequest, item *bufferItem, unsub func()) *Subscription { @@ -124,11 +129,11 @@ func filter(req *SubscribeRequest, events []structs.Event) []structs.Event { var count int for _, e := range events { - _, allTopics := req.Topics[AllKeys] + _, allTopics := req.Topics[structs.TopicAll] if _, ok := req.Topics[e.Topic]; ok || allTopics { var keys []string if allTopics { - keys = req.Topics[AllKeys] + keys = req.Topics[structs.TopicAll] } else { keys = req.Topics[e.Topic] } @@ -136,9 +141,7 @@ func filter(req *SubscribeRequest, events []structs.Event) []structs.Event { continue } for _, k := range keys { - // if req.Namespace != "" && e.Namespace != "" && e.Namespace == - // if e.Namespace != "" && e.Namespace - if e.Key == k || k == AllKeys { + if e.Key == k || k == string(structs.TopicAll) || filterKeyContains(e.FilterKeys, k) { count++ } } @@ -156,11 +159,11 @@ func filter(req *SubscribeRequest, events []structs.Event) []structs.Event { // Return filtered events result := make([]structs.Event, 0, count) for _, e := range events { - _, allTopics := req.Topics[AllKeys] + _, allTopics := req.Topics[structs.TopicAll] if _, ok := req.Topics[e.Topic]; ok || allTopics { var keys []string if allTopics { - keys = req.Topics[AllKeys] + keys = req.Topics[structs.TopicAll] } else { keys = req.Topics[e.Topic] } @@ -169,7 +172,7 @@ func filter(req *SubscribeRequest, events []structs.Event) []structs.Event { continue } for _, k := range keys { - if e.Key == k || k == AllKeys { + if e.Key == k || k == string(structs.TopicAll) || filterKeyContains(e.FilterKeys, k) { result = append(result, e) } } @@ -177,3 +180,12 @@ func filter(req *SubscribeRequest, events []structs.Event) []structs.Event { } return result } + +func filterKeyContains(filterKeys []string, key string) bool { + for _, fk := range filterKeys { + if fk == key { + return true + } + } + return false +} diff --git a/nomad/stream/subscription_test.go b/nomad/stream/subscription_test.go index a83be880986..c54bc4fa557 100644 --- a/nomad/stream/subscription_test.go +++ b/nomad/stream/subscription_test.go @@ -8,10 +8,6 @@ import ( "github.com/stretchr/testify/require" ) -func TestSubscription(t *testing.T) { - -} - func TestFilter_AllTopics(t *testing.T) { events := make([]structs.Event, 0, 5) events = append(events, structs.Event{Topic: "Test", Key: "One"}, structs.Event{Topic: "Test", Key: "Two"}) @@ -112,3 +108,22 @@ func TestFilter_Namespace(t *testing.T) { require.Equal(t, cap(actual), 2) } + +func TestFilter_FilterKeys(t *testing.T) { + events := make([]structs.Event, 0, 5) + events = append(events, structs.Event{Topic: "Test", Key: "One", FilterKeys: []string{"extra-key"}}, structs.Event{Topic: "Test", Key: "Two"}, structs.Event{Topic: "Test", Key: "Two"}) + + req := &SubscribeRequest{ + Topics: map[structs.Topic][]string{ + "Test": {"extra-key"}, + }, + Namespace: "foo", + } + actual := filter(req, events) + expected := []structs.Event{ + {Topic: "Test", Key: "One", FilterKeys: []string{"extra-key"}}, + } + require.Equal(t, expected, actual) + + require.Equal(t, cap(actual), 1) +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 75db4c35cab..143670eee16 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -10708,8 +10708,7 @@ type ACLTokenUpsertResponse struct { WriteMeta } -// EEventStreamRequest is used to stream events from a servers -// EventPublisher +// EventStreamRequest is used to stream events from a servers EventBroker type EventStreamRequest struct { Topics map[Topic][]string Index int @@ -10719,7 +10718,7 @@ type EventStreamRequest struct { type EventStreamWrapper struct { Error *RpcError - Event *NDJson + Event *EventJson } // RpcError is used for serializing errors with a potential error code @@ -10741,28 +10740,54 @@ func (r *RpcError) Error() string { type Topic string +const ( + TopicDeployment Topic = "Deployment" + TopicEval Topic = "Eval" + TopicAlloc Topic = "Alloc" + TopicJob Topic = "Job" + TopicNode Topic = "Node" + TopicAll Topic = "*" +) + +// Event represents a change in Nomads state. type Event struct { - Topic Topic - Type string - Key string - Namespace string + // Topic represeents the primary object for the event + Topic Topic + + // Type is a short string representing the reason for the event + Type string + + // Key is the primary identifier of the Event, The involved objects ID + Key string + + // Namespace is the namespace of the object, If the object is not namespace + // aware (Node) it is left blank + Namespace string + + // FilterKeys are a set of additional related keys that are used to include + // events during filtering. FilterKeys []string - Index uint64 - Payload interface{} + + // Index is the raft index that corresponds to the event + Index uint64 + + // Payload is the Event itself see state/events.go for a list of events + Payload interface{} } +// Events is a wrapper that contains a set of events for a given index. type Events struct { Index uint64 Events []Event } -// NNDJson is a wrapper for a Newline Delimited JSON object -type NDJson struct { +// EventJson is a wrapper for a JSON object +type EventJson struct { Data []byte } -func (j *NDJson) Copy() *NDJson { - n := new(NDJson) +func (j *EventJson) Copy() *EventJson { + n := new(EventJson) *n = *j n.Data = make([]byte, len(j.Data)) copy(n.Data, j.Data) diff --git a/nomad/testing.go b/nomad/testing.go index edb2d1c8e49..3b9d4bf4b8b 100644 --- a/nomad/testing.go +++ b/nomad/testing.go @@ -47,7 +47,7 @@ func TestServer(t testing.T, cb func(*Config)) (*Server, func()) { config.Logger = testlog.HCLogger(t) config.Build = version.Version + "+unittest" config.DevMode = true - config.EnableEventPublisher = true + config.EnableEventBroker = true config.BootstrapExpect = 1 nodeNum := atomic.AddUint32(&nodeNumber, 1) config.NodeName = fmt.Sprintf("nomad-%03d", nodeNum) diff --git a/vendor/github.com/hashicorp/nomad/api/event.go b/vendor/github.com/hashicorp/nomad/api/event.go index dce3c265fe8..f89e222848f 100644 --- a/vendor/github.com/hashicorp/nomad/api/event.go +++ b/vendor/github.com/hashicorp/nomad/api/event.go @@ -6,7 +6,7 @@ import ( "fmt" ) -// Ebvents is a set of events for a corresponding index. Events returned for the +// Events is a set of events for a corresponding index. Events returned for the // index depend on which topics are subscribed to when a request is made. type Events struct { Index uint64 @@ -47,11 +47,11 @@ func (c *Client) EventStream() *EventStream { // Stream establishes a new subscription to Nomad's event stream and streams // results back to the returned channel. func (e *EventStream) Stream(ctx context.Context, topics map[Topic][]string, index uint64, q *QueryOptions) (<-chan *Events, error) { - r, err := e.client.newRequest("GET", "/v1/event/stream") if err != nil { return nil, err } + q = q.WithContext(ctx) r.setQueryOptions(q) // Build topic query params @@ -78,11 +78,11 @@ func (e *EventStream) Stream(ctx context.Context, topics map[Topic][]string, ind // Decode next newline delimited json of events var events Events if err := dec.Decode(&events); err != nil { + // set error and fallthrough to + // select eventsCh events = Events{Err: err} - eventsCh <- &events - return } - if events.IsHeartbeat() { + if events.Err == nil && events.IsHeartbeat() { continue }