Skip to content

Commit

Permalink
events: Add support for multi-namespace subscriptions (#22540)
Browse files Browse the repository at this point in the history
Events from multiple namespaces can be subscribed to via
glob patterns passed to the subscription.

This does not do policy enforcement yet -- that will come in PR soon.

I tested this manually as well by pulling it into Vault Enterprise
so I could create namespaces and check that subscriptions work as
expected.

Co-authored-by: Tom Proctor <[email protected]>
  • Loading branch information
Christopher Swenson and tomhjp authored Aug 25, 2023
1 parent f143f6a commit 3e900fd
Show file tree
Hide file tree
Showing 5 changed files with 244 additions and 21 deletions.
3 changes: 3 additions & 0 deletions changelog/22540.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
events: Allow subscriptions to multiple namespaces
```
44 changes: 39 additions & 5 deletions command/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ var (

type EventsSubscribeCommands struct {
*BaseCommand

namespaces []string
}

func (c *EventsSubscribeCommands) Synopsis() string {
Expand All @@ -31,10 +33,11 @@ func (c *EventsSubscribeCommands) Synopsis() string {

func (c *EventsSubscribeCommands) Help() string {
helpText := `
Usage: vault events subscribe [-format=json] [-timeout=XYZs] eventType
Usage: vault events subscribe [-namespaces=ns1] [-timeout=XYZs] eventType
Subscribe to events of the given event type (topic). The events will be
output to standard out.
Subscribe to events of the given event type (topic), which may be a glob
pattern (with "*"" treated as a wildcard). The events will be sent to
standard out.
The output will be a JSON object serialized using the default protobuf
JSON serialization format, with one line per event received.
Expand All @@ -44,7 +47,19 @@ Usage: vault events subscribe [-format=json] [-timeout=XYZs] eventType

func (c *EventsSubscribeCommands) Flags() *FlagSets {
set := c.flagSet(FlagSetHTTP)

f := set.NewFlagSet("Subscribe Options")
f.StringSliceVar(&StringSliceVar{
Name: "namespaces",
Usage: `Specifies one or more patterns of additional child namespaces
to subscribe to. The namespace of the request is automatically
prepended, so specifying 'ns2' when the request is in the 'ns1'
namespace will result in subscribing to 'ns1/ns2', in addition to
'ns1'. Patterns can include "*" characters to indicate
wildcards. The default is to subscribe only to the request's
namespace.`,
Default: []string{},
Target: &c.namespaces,
})
return set
}

Expand Down Expand Up @@ -88,6 +103,22 @@ func (c *EventsSubscribeCommands) Run(args []string) int {
return 0
}

// cleanNamespace removes leading and trailing space and /'s from the namespace path.
func cleanNamespace(ns string) string {
ns = strings.TrimSpace(ns)
ns = strings.Trim(ns, "/")
return ns
}

// cleanNamespaces removes leading and trailing space and /'s from the namespace paths.
func cleanNamespaces(namespaces []string) []string {
cleaned := make([]string, len(namespaces))
for i, ns := range namespaces {
cleaned[i] = cleanNamespace(ns)
}
return cleaned
}

func (c *EventsSubscribeCommands) subscribeRequest(client *api.Client, path string) error {
r := client.NewRequest("GET", "/v1/"+path)
u := r.URL
Expand All @@ -98,9 +129,12 @@ func (c *EventsSubscribeCommands) subscribeRequest(client *api.Client, path stri
}
q := u.Query()
q.Set("json", "true")
if len(c.namespaces) > 0 {
q["namespaces"] = cleanNamespaces(c.namespaces)
}
u.RawQuery = q.Encode()
client.AddHeader("X-Vault-Token", client.Token())
client.AddHeader("X-Vault-Namesapce", client.Namespace())
client.AddHeader("X-Vault-Namespace", client.Namespace())
ctx := context.Background()

// Follow redirects in case our request if our request is forwarded to the leader.
Expand Down
36 changes: 27 additions & 9 deletions http/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"net/http"
"path"
"strconv"
"strings"
"time"
Expand All @@ -22,21 +23,21 @@ import (
)

type eventSubscribeArgs struct {
ctx context.Context
logger hclog.Logger
events *eventbus.EventBus
ns *namespace.Namespace
pattern string
conn *websocket.Conn
json bool
ctx context.Context
logger hclog.Logger
events *eventbus.EventBus
namespacePatterns []string
pattern string
conn *websocket.Conn
json bool
}

// handleEventsSubscribeWebsocket runs forever, returning a websocket error code and reason
// only if the connection closes or there was an error.
func handleEventsSubscribeWebsocket(args eventSubscribeArgs) (websocket.StatusCode, string, error) {
ctx := args.ctx
logger := args.logger
ch, cancel, err := args.events.Subscribe(ctx, args.ns, args.pattern)
ch, cancel, err := args.events.SubscribeMultipleNamespaces(ctx, args.namespacePatterns, args.pattern)
if err != nil {
logger.Info("Error subscribing", "error", err)
return websocket.StatusUnsupportedData, "Error subscribing", nil
Expand Down Expand Up @@ -123,6 +124,8 @@ func handleEventsSubscribe(core *vault.Core, req *logical.Request) http.Handler
}
}

namespacePatterns := r.URL.Query()["namespaces"]
namespacePatterns = prependNamespacePatterns(namespacePatterns, ns)
conn, err := websocket.Accept(w, r, nil)
if err != nil {
logger.Info("Could not accept as websocket", "error", err)
Expand All @@ -143,7 +146,7 @@ func handleEventsSubscribe(core *vault.Core, req *logical.Request) http.Handler
}
}()

closeStatus, closeReason, err := handleEventsSubscribeWebsocket(eventSubscribeArgs{ctx, logger, core.Events(), ns, pattern, conn, json})
closeStatus, closeReason, err := handleEventsSubscribeWebsocket(eventSubscribeArgs{ctx, logger, core.Events(), namespacePatterns, pattern, conn, json})
if err != nil {
closeStatus = websocket.CloseStatus(err)
if closeStatus == -1 {
Expand All @@ -163,3 +166,18 @@ func handleEventsSubscribe(core *vault.Core, req *logical.Request) http.Handler
}
})
}

// prependNamespacePatterns prepends the request namespace to the namespace patterns,
// and also adds the request namespace to the list.
func prependNamespacePatterns(patterns []string, requestNamespace *namespace.Namespace) []string {
prepend := strings.Trim(requestNamespace.Path, "/")
newPatterns := make([]string, 0, len(patterns)+1)
newPatterns = append(newPatterns, prepend)
for _, pattern := range patterns {
if strings.Trim(strings.TrimSpace(pattern), "/") == "" {
continue
}
newPatterns = append(newPatterns, path.Join(prepend, pattern, "/"))
}
return newPatterns
}
156 changes: 156 additions & 0 deletions http/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/hashicorp/vault/sdk/logical"
"github.com/hashicorp/vault/vault"
"github.com/hashicorp/vault/vault/cluster"
"github.com/stretchr/testify/assert"
"nhooyr.io/websocket"
)

Expand Down Expand Up @@ -134,6 +135,161 @@ func TestEventsSubscribe(t *testing.T) {
}
}

// TestEventsSubscribeNamespaces tests the websocket endpoint for subscribing to events in multiple namespaces.
func TestEventsSubscribeNamespaces(t *testing.T) {
core := vault.TestCoreWithConfig(t, &vault.CoreConfig{
Experiments: []string{experiments.VaultExperimentEventsAlpha1},
})

ln, addr := TestServer(t, core)
defer ln.Close()

// unseal the core
keys, token := vault.TestCoreInit(t, core)
for _, key := range keys {
_, err := core.Unseal(key)
if err != nil {
t.Fatal(err)
}
}

stop := atomic.Bool{}

const eventType = "abc"

namespaces := []string{
"",
"ns1",
"ns2",
"ns1/ns13",
"ns1/ns13/ns134",
}

// send some events with the specified namespaces
sendEvents := func() {
pluginInfo := &logical.EventPluginInfo{
MountPath: "secret",
}
for i := range namespaces {
var ns *namespace.Namespace
if namespaces[i] == "" {
ns = namespace.RootNamespace
} else {
ns = &namespace.Namespace{
ID: namespaces[i],
Path: namespaces[i],
CustomMetadata: nil,
}
}
id, err := uuid.GenerateUUID()
if err != nil {
core.Logger().Info("Error generating UUID, exiting sender", "error", err)
}
err = core.Events().SendEventInternal(namespace.RootContext(context.Background()), ns, pluginInfo, eventType, &logical.EventData{
Id: id,
Metadata: nil,
EntityIds: nil,
Note: "testing",
})
if err != nil {
core.Logger().Info("Error sending event, exiting sender", "error", err)
}
}
}

t.Cleanup(func() {
stop.Store(true)
})

ctx := context.Background()
wsAddr := strings.Replace(addr, "http", "ws", 1)

testCases := []struct {
name string
namespaces []string
expectedEvents int
}{
{"invalid", []string{"something"}, 1},
{"simple wildcard", []string{"ns*"}, 5},
{"two namespaces", []string{"ns1/ns13", "ns1/other"}, 2},
{"no namespace", []string{""}, 1},
{"all wildcard", []string{"*"}, 5},
{"mixed wildcard", []string{"ns1/ns13*", "ns2"}, 4},
{"overlapping wildcard", []string{"ns*", "ns1"}, 5},
}

for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
extra := ""
for _, ns := range testCase.namespaces {
extra += "&namespaces=" + ns
}
url := fmt.Sprintf("%s/v1/sys/events/subscribe/%s?json=true%v", wsAddr, eventType, extra)
conn, _, err := websocket.Dial(ctx, url, &websocket.DialOptions{
HTTPHeader: http.Header{"x-vault-token": []string{token}},
})
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
conn.Close(websocket.StatusNormalClosure, "")
})
sendEvents()
ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
defer cancel()
gotEvents := 0
for {
_, msg, err := conn.Read(ctx)
if err != nil {
break
}
event := map[string]interface{}{}
err = json.Unmarshal(msg, &event)
if err != nil {
t.Fatal(err)
}
t.Log(string(msg))
gotEvents += 1
}
assert.Equal(t, testCase.expectedEvents, gotEvents)
})
}
}

func TestNamespacePrepend(t *testing.T) {
testCases := []struct {
requestNs string
patterns []string
result []string
}{
{"", []string{"ns*"}, []string{"", "ns*"}},
{"ns1", []string{"ns*"}, []string{"ns1", "ns1/ns*"}},
{"ns1", []string{"ns1*"}, []string{"ns1", "ns1/ns1*"}},
{"ns1", []string{"ns1/*"}, []string{"ns1", "ns1/ns1/*"}},
{"", []string{"ns1/ns13", "ns1/other"}, []string{"", "ns1/ns13", "ns1/other"}},
{"ns1", []string{"ns1/ns13", "ns1/other"}, []string{"ns1", "ns1/ns1/ns13", "ns1/ns1/other"}},
{"", []string{""}, []string{""}},
{"", nil, []string{""}},
{"ns1", []string{""}, []string{"ns1"}},
{"ns1", []string{"", ""}, []string{"ns1"}},
{"ns1", []string{"ns1"}, []string{"ns1", "ns1/ns1"}},
{"", []string{"*"}, []string{"", "*"}},
{"ns1", []string{"*"}, []string{"ns1", "ns1/*"}},
{"", []string{"ns1/ns13*", "ns2"}, []string{"", "ns1/ns13*", "ns2"}},
{"ns1", []string{"ns1/ns13*", "ns2"}, []string{"ns1", "ns1/ns1/ns13*", "ns1/ns2"}},
{"", []string{"ns*", "ns1"}, []string{"", "ns*", "ns1"}},
{"ns1", []string{"ns*", "ns1"}, []string{"ns1", "ns1/ns*", "ns1/ns1"}},
{"ns1", []string{"ns1*", "ns1"}, []string{"ns1", "ns1/ns1*", "ns1/ns1"}},
{"ns1", []string{"ns1/*", "ns1"}, []string{"ns1", "ns1/ns1/*", "ns1/ns1"}},
}
for _, testCase := range testCases {
t.Run(testCase.requestNs+" "+strings.Join(testCase.patterns, " "), func(t *testing.T) {
result := prependNamespacePatterns(testCase.patterns, &namespace.Namespace{ID: testCase.requestNs, Path: testCase.requestNs})
assert.Equal(t, testCase.result, result)
})
}
}

func checkRequiredCloudEventsFields(t *testing.T, event map[string]interface{}) {
t.Helper()
for _, attr := range []string{"id", "source", "specversion", "type"} {
Expand Down
26 changes: 19 additions & 7 deletions vault/eventbus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,10 @@ func NewEventBus(logger hclog.Logger) (*EventBus, error) {
}

func (bus *EventBus) Subscribe(ctx context.Context, ns *namespace.Namespace, pattern string) (<-chan *eventlogger.Event, context.CancelFunc, error) {
return bus.SubscribeMultipleNamespaces(ctx, []string{ns.Path}, pattern)
}

func (bus *EventBus) SubscribeMultipleNamespaces(ctx context.Context, namespacePathPatterns []string, pattern string) (<-chan *eventlogger.Event, context.CancelFunc, error) {
// subscriptions are still stored even if the bus has not been started
pipelineID, err := uuid.GenerateUUID()
if err != nil {
Expand All @@ -213,7 +217,7 @@ func (bus *EventBus) Subscribe(ctx context.Context, ns *namespace.Namespace, pat
return nil, nil, err
}

filterNode := newFilterNode(ns, pattern)
filterNode := newFilterNode(namespacePathPatterns, pattern)
err = bus.broker.RegisterNode(eventlogger.NodeID(filterNodeID), filterNode)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -259,15 +263,23 @@ func (bus *EventBus) SetSendTimeout(timeout time.Duration) {
bus.timeout = timeout
}

func newFilterNode(ns *namespace.Namespace, pattern string) *eventlogger.Filter {
func newFilterNode(namespacePatterns []string, pattern string) *eventlogger.Filter {
return &eventlogger.Filter{
Predicate: func(e *eventlogger.Event) (bool, error) {
eventRecv := e.Payload.(*logical.EventReceived)

// Drop if event is not in our namespace.
// TODO: add wildcard/child namespace processing here in some cases?
if eventRecv.Namespace != ns.Path {
return false, nil
eventNs := strings.Trim(eventRecv.Namespace, "/")
// Drop if event is not in namespace patterns namespace.
if len(namespacePatterns) > 0 {
allow := false
for _, nsPattern := range namespacePatterns {
if glob.Glob(nsPattern, eventNs) {
allow = true
break
}
}
if !allow {
return false, nil
}
}

// Filter for correct event type, including wildcards.
Expand Down

0 comments on commit 3e900fd

Please sign in to comment.