Skip to content

Commit

Permalink
events: fix wildcard namespace handling (#10935)
Browse files Browse the repository at this point in the history
This fixes a bug in the event stream API where it currently interprets
namespace=* as an actual namespace, not a wildcard. When Nomad parses
incoming requests, it sets namespace to default if not specified, which
means the request namespace will never be an empty string, which is what
the event subscription was checking for. This changes the conditional
logic to check for a wildcard namespace instead of an empty one.

It also updates some event tests to include the default namespace in the
subscription to match current behavior.

Fixes #10903
  • Loading branch information
isabeldepapel authored and schmichael committed Oct 5, 2021
1 parent 3c1aaf9 commit 266bd03
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 5 deletions.
3 changes: 3 additions & 0 deletions .changelog/10935.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
events: Fixed wildcard namespace handling
```
2 changes: 2 additions & 0 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3378,6 +3378,7 @@ func TestFSM_ACLEvents(t *testing.T) {
Topics: map[structs.Topic][]string{
tc.reqTopic: {"*"},
},
Namespace: "default",
}

sub, err := broker.Subscribe(subReq)
Expand Down Expand Up @@ -3431,6 +3432,7 @@ func TestFSM_EventBroker_JobRegisterFSMEvents(t *testing.T) {
Topics: map[structs.Topic][]string{
structs.TopicJob: {"*"},
},
Namespace: "default",
}

sub, err := broker.Subscribe(subReq)
Expand Down
1 change: 1 addition & 0 deletions nomad/state/deployment_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func EventsForIndex(t *testing.T, s *StateStore, index uint64) []structs.Event {
Topics: map[structs.Topic][]string{
"*": {"*"},
},
Namespace: "default",
Index: index,
StartExactlyAtIndex: true,
})
Expand Down
30 changes: 29 additions & 1 deletion nomad/stream/event_broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,27 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) {
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
},
},
{
desc: "subscribed to evals in all namespaces and removed access",
policyBeforeRules: mock.NamespacePolicy("*", "", []string{acl.NamespaceCapabilityReadJob}),
policyAfterRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
shouldUnsubscribe: true,
event: structs.Event{
Topic: structs.TopicEvaluation,
Type: structs.TypeEvalUpdated,
Namespace: "foo",
Payload: structs.EvaluationEvent{
Evaluation: &structs.Evaluation{
ID: "some-id",
},
},
},
policyEvent: structs.Event{
Topic: structs.TopicACLToken,
Type: structs.TypeACLTokenUpserted,
Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}),
},
},
{
desc: "subscribed to deployments and no access change",
policyBeforeRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}),
Expand Down Expand Up @@ -467,11 +488,18 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) {
publisher, err := NewEventBroker(ctx, aclDelegate, EventBrokerCfg{})
require.NoError(t, err)

var ns string
if tc.event.Namespace != "" {
ns = tc.event.Namespace
} else {
ns = structs.DefaultNamespace
}

sub, err := publisher.SubscribeWithACLCheck(&SubscribeRequest{
Topics: map[structs.Topic][]string{
tc.event.Topic: {"*"},
},
Namespace: structs.DefaultNamespace,
Namespace: ns,
Token: secretID,
})

Expand Down
5 changes: 3 additions & 2 deletions nomad/stream/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,15 @@ func filter(req *SubscribeRequest, events []structs.Event) []structs.Event {

allTopicKeys := req.Topics[structs.TopicAll]

if req.Namespace == "" && len(allTopicKeys) == 1 && allTopicKeys[0] == string(structs.TopicAll) {
// Return all events if subscribed to all namespaces and all topics
if req.Namespace == "*" && len(allTopicKeys) == 1 && allTopicKeys[0] == string(structs.TopicAll) {
return events
}

var result []structs.Event

for _, event := range events {
if req.Namespace != "" && event.Namespace != "" && event.Namespace != req.Namespace {
if req.Namespace != "*" && event.Namespace != "" && event.Namespace != req.Namespace {
continue
}

Expand Down
23 changes: 23 additions & 0 deletions nomad/stream/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,29 @@ func TestFilter_Namespace(t *testing.T) {
require.Equal(t, 2, cap(actual))
}

func TestFilter_NamespaceAll(t *testing.T) {
events := make([]structs.Event, 0, 5)
events = append(events,
structs.Event{Topic: "Test", Key: "One", Namespace: "foo"},
structs.Event{Topic: "Test", Key: "Two", Namespace: "bar"},
structs.Event{Topic: "Test", Key: "Three", Namespace: "default"},
)

req := &SubscribeRequest{
Topics: map[structs.Topic][]string{
"*": {"*"},
},
Namespace: "*",
}
actual := filter(req, events)
expected := []structs.Event{
{Topic: "Test", Key: "One", Namespace: "foo"},
{Topic: "Test", Key: "Two", Namespace: "bar"},
{Topic: "Test", Key: "Three", Namespace: "default"},
}
require.Equal(t, expected, actual)
}

func TestFilter_FilterKeys(t *testing.T) {
events := make([]structs.Event, 0, 5)
events = append(events, structs.Event{Topic: "Test", Key: "One", FilterKeys: []string{"extra-key"}}, structs.Event{Topic: "Test", Key: "Two"}, structs.Event{Topic: "Test", Key: "Two"})
Expand Down
10 changes: 8 additions & 2 deletions website/content/api-docs/events.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ by default, requiring a management token.

- `namespace` `(string: "default")` - Specifies the target namespace to filter
on. Specifying `*` includes all namespaces for event types that support
namespaces.
namespaces. If you specify all namespaces (`*`) you'll either need a management
token, or an ACL Policy that explicitly applies to all namespaces (`*`).

- `topic` `(topic:filter_key: "*:*")` - Specifies a topic to subscribe to and
filter on. The default is to subscribe to all topics. Multiple topics may be
Expand Down Expand Up @@ -96,10 +97,15 @@ by default, requiring a management token.
### Sample Request

```shell-session
# Subscribe to all events and topics
# Subscribe to all events and topics in the default namespace
$ curl -s -v -N http://127.0.0.1:4646/v1/event/stream
```

```shell-session
# Subscribe to all events and topics in all namespaces
$ curl -s -v -N http://127.0.0.1:4646/v1/event/stream?namespace=*
```

```shell-session
# Start at index 100 and subscribe to all Evaluation events
$ curl -s -v -N http://127.0.0.1:4646/v1/event/stream?index=100&topic=Evaluation
Expand Down

0 comments on commit 266bd03

Please sign in to comment.