diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 3a66041ceaf..fb38b381cc7 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -2,6 +2,7 @@ package nomad import ( "bytes" + "context" "fmt" "reflect" "strings" @@ -15,6 +16,7 @@ import ( "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/state" + "github.com/hashicorp/nomad/nomad/stream" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" "github.com/hashicorp/raft" @@ -3276,3 +3278,156 @@ func TestFSM_SnapshotRestore_Namespaces(t *testing.T) { t.Fatalf("bad: \n%#v\n%#v", out2, ns2) } } + +func TestFSM_ACLEvents_ACLToken(t *testing.T) { + t.Parallel() + + cases := []struct { + desc string + setupfn func(t *testing.T, fsm *nomadFSM) + raftReq func(t *testing.T) []byte + reqTopic structs.Topic + eventfn func(t *testing.T, e []structs.Event) + }{ + { + desc: "ACLToken upserted", + raftReq: func(t *testing.T) []byte { + req := structs.ACLTokenUpsertRequest{ + Tokens: []*structs.ACLToken{mock.ACLToken()}, + } + buf, err := structs.Encode(structs.ACLTokenUpsertRequestType, req) + require.NoError(t, err) + return buf + }, + reqTopic: structs.TopicACLToken, + eventfn: func(t *testing.T, e []structs.Event) { + require.Len(t, e, 1) + require.Equal(t, e[0].Topic, structs.TopicACLToken) + require.Empty(t, e[0].Payload.(*structs.ACLTokenEvent).ACLToken.SecretID) + require.Equal(t, e[0].Type, structs.TypeACLTokenUpserted) + }, + }, + { + desc: "ACLToken deleted", + setupfn: func(t *testing.T, fsm *nomadFSM) { + token := mock.ACLToken() + token.SecretID = "26be01d3-df3a-45e9-9f49-4487a3dc3496" + token.AccessorID = "b971acba-bbe5-4274-bdfa-8bb1f542a8c1" + + require.NoError(t, + fsm.State().UpsertACLTokens( + structs.MsgTypeTestSetup, 10, []*structs.ACLToken{token})) + }, + raftReq: func(t *testing.T) []byte { + req := structs.ACLTokenDeleteRequest{ + AccessorIDs: []string{"b971acba-bbe5-4274-bdfa-8bb1f542a8c1"}, + } + buf, err := structs.Encode(structs.ACLTokenDeleteRequestType, req) + require.NoError(t, err) + return buf + }, + reqTopic: structs.TopicACLToken, + eventfn: func(t *testing.T, e []structs.Event) { + require.Len(t, e, 1) + require.Equal(t, e[0].Topic, structs.TopicACLToken) + require.Empty(t, e[0].Payload.(*structs.ACLTokenEvent).ACLToken.SecretID) + require.Equal(t, e[0].Type, structs.TypeACLTokenDeleted) + }, + }, + { + desc: "ACLPolicy upserted", + raftReq: func(t *testing.T) []byte { + req := structs.ACLPolicyUpsertRequest{ + Policies: []*structs.ACLPolicy{mock.ACLPolicy()}, + } + buf, err := structs.Encode(structs.ACLPolicyUpsertRequestType, req) + require.NoError(t, err) + return buf + }, + reqTopic: structs.TopicACLPolicy, + eventfn: func(t *testing.T, e []structs.Event) { + require.Len(t, e, 1) + require.Equal(t, e[0].Topic, structs.TopicACLPolicy) + require.Equal(t, e[0].Type, structs.TypeACLPolicyUpserted) + }, + }, + { + desc: "ACLPolicy deleted", + setupfn: func(t *testing.T, fsm *nomadFSM) { + policy := mock.ACLPolicy() + policy.Name = "some-policy" + + require.NoError(t, + fsm.State().UpsertACLPolicies( + structs.MsgTypeTestSetup, 10, []*structs.ACLPolicy{policy})) + }, + raftReq: func(t *testing.T) []byte { + req := structs.ACLPolicyDeleteRequest{ + Names: []string{"some-policy"}, + } + buf, err := structs.Encode(structs.ACLPolicyDeleteRequestType, req) + require.NoError(t, err) + return buf + }, + reqTopic: structs.TopicACLPolicy, + eventfn: func(t *testing.T, e []structs.Event) { + require.Len(t, e, 1) + require.Equal(t, e[0].Topic, structs.TopicACLPolicy) + require.Equal(t, e[0].Type, structs.TypeACLPolicyDeleted) + }, + }, + } + + for _, tc := range cases { + t.Run(tc.desc, func(t *testing.T) { + fsm := testFSM(t) + + // Setup any state necessary + if tc.setupfn != nil { + tc.setupfn(t, fsm) + } + + // Apply the log + resp := fsm.Apply(makeLog(tc.raftReq(t))) + require.Nil(t, resp) + + broker, err := fsm.State().EventBroker() + require.NoError(t, err) + + subReq := &stream.SubscribeRequest{ + Topics: map[structs.Topic][]string{ + tc.reqTopic: {"*"}, + }, + } + + sub, err := broker.Subscribe(subReq) + require.NoError(t, err) + + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(100*time.Millisecond)) + defer cancel() + + var events []structs.Event + for { + out, err := sub.Next(ctx) + if len(out.Events) == 0 { + break + } + + // consume the queue until the deadline has exceeded or until we've + // received more events than expected + if err == context.DeadlineExceeded { + break + } + require.NoError(t, err) + + events = append(events, out.Events...) + + if len(events) >= 1 { + break + } + + } + tc.eventfn(t, events) + }) + } +} diff --git a/nomad/state/events.go b/nomad/state/events.go index 96fa7bfb893..2815703541a 100644 --- a/nomad/state/events.go +++ b/nomad/state/events.go @@ -56,12 +56,15 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { if !ok { return structs.Event{}, false } + + // Copy token and empty out secret ID + token := before.Copy() + token.SecretID = "" + return structs.Event{ - Topic: structs.TopicACLToken, - Key: before.AccessorID, - Payload: structs.ACLTokenEvent{ - ACLToken: before, - }, + Topic: structs.TopicACLToken, + Key: token.AccessorID, + Payload: structs.NewACLTokenEvent(before.SecretID, token), }, true case "acl_policy": before, ok := change.Before.(*structs.ACLPolicy) @@ -71,7 +74,7 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { return structs.Event{ Topic: structs.TopicACLPolicy, Key: before.Name, - Payload: structs.ACLPolicyEvent{ + Payload: &structs.ACLPolicyEvent{ ACLPolicy: before, }, }, true @@ -102,12 +105,15 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { if !ok { return structs.Event{}, false } + + // Copy token and empty out secret ID + token := after.Copy() + token.SecretID = "" + return structs.Event{ - Topic: structs.TopicACLToken, - Key: after.AccessorID, - Payload: structs.ACLTokenEvent{ - ACLToken: after, - }, + Topic: structs.TopicACLToken, + Key: token.AccessorID, + Payload: structs.NewACLTokenEvent(after.SecretID, token), }, true case "acl_policy": after, ok := change.After.(*structs.ACLPolicy) @@ -117,7 +123,7 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { return structs.Event{ Topic: structs.TopicACLPolicy, Key: after.Name, - Payload: structs.ACLPolicyEvent{ + Payload: &structs.ACLPolicyEvent{ ACLPolicy: after, }, }, true diff --git a/nomad/state/events_test.go b/nomad/state/events_test.go index 919bbcbcf84..3eba439d901 100644 --- a/nomad/state/events_test.go +++ b/nomad/state/events_test.go @@ -41,6 +41,59 @@ func TestEventFromChange_SingleEventPerTable(t *testing.T) { require.Equal(t, out.Events[0].Type, structs.TypeJobRegistered) } +func TestEventFromChange_ACLTokenSecretID(t *testing.T) { + t.Parallel() + s := TestStateStoreCfg(t, TestStateStorePublisher(t)) + defer s.StopEventBroker() + + token := mock.ACLToken() + require.NotEmpty(t, token.SecretID) + + // Create + changes := Changes{ + Index: 100, + MsgType: structs.NodeRegisterRequestType, + Changes: memdb.Changes{ + { + Table: "acl_token", + Before: nil, + After: token, + }, + }, + } + + out := eventsFromChanges(s.db.ReadTxn(), changes) + require.Len(t, out.Events, 1) + // Ensure original value not altered + require.NotEmpty(t, token.SecretID) + + aclTokenEvent, ok := out.Events[0].Payload.(*structs.ACLTokenEvent) + require.True(t, ok) + require.Empty(t, aclTokenEvent.ACLToken.SecretID) + + require.Equal(t, token.SecretID, aclTokenEvent.SecretID()) + + // Delete + changes = Changes{ + Index: 100, + MsgType: structs.NodeDeregisterRequestType, + Changes: memdb.Changes{ + { + Table: "acl_token", + Before: token, + After: nil, + }, + }, + } + + out2 := eventsFromChanges(s.db.ReadTxn(), changes) + require.Len(t, out2.Events, 1) + + tokenEvent2, ok := out2.Events[0].Payload.(*structs.ACLTokenEvent) + require.True(t, ok) + require.Empty(t, tokenEvent2.ACLToken.SecretID) +} + // TestEventFromChange_NodeSecretID ensures that a node's secret ID is not // included in a node event func TestEventFromChange_NodeSecretID(t *testing.T) { diff --git a/nomad/stream/event_broker.go b/nomad/stream/event_broker.go index 1625d976c7f..7d19728eba2 100644 --- a/nomad/stream/event_broker.go +++ b/nomad/stream/event_broker.go @@ -29,8 +29,7 @@ type EventBrokerCfg struct { type EventBroker struct { // mu protects subscriptions - mu sync.Mutex - + mu sync.Mutex subscriptions *subscriptions // eventBuf stores a configurable amount of events in memory @@ -189,8 +188,8 @@ func (e *EventBroker) handleACLUpdates(ctx context.Context) { return case update := <-e.aclCh: switch payload := update.Payload.(type) { - case structs.ACLTokenEvent: - tokenSecretID := payload.ACLToken.SecretID + case *structs.ACLTokenEvent: + tokenSecretID := payload.SecretID() // Token was deleted if update.Type == structs.TypeACLTokenDeleted { @@ -214,7 +213,7 @@ func (e *EventBroker) handleACLUpdates(ctx context.Context) { return !aclAllowsSubscription(aclObj, sub.req) }) - case structs.ACLPolicyEvent: + case *structs.ACLPolicyEvent: // Re-evaluate each subscriptions permissions since a policy // change may or may not affect the subscription e.checkSubscriptionsAgainstPolicyChange() diff --git a/nomad/stream/event_broker_test.go b/nomad/stream/event_broker_test.go index 633d6fda7dd..bc33dc6d9df 100644 --- a/nomad/stream/event_broker_test.go +++ b/nomad/stream/event_broker_test.go @@ -116,7 +116,7 @@ func TestEventBroker_EmptyReqToken_DistinctSubscriptions(t *testing.T) { require.Equal(t, subscriptionStateOpen, atomic.LoadUint32(&sub2.state)) } -func TestEventBroker_handleACLUpdates_tokendeleted(t *testing.T) { +func TestEventBroker_handleACLUpdates_TokenDeleted(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) @@ -133,13 +133,9 @@ func TestEventBroker_handleACLUpdates_tokendeleted(t *testing.T) { defer sub1.Unsubscribe() aclEvent := structs.Event{ - Topic: structs.TopicACLToken, - Type: structs.TypeACLTokenDeleted, - Payload: structs.ACLTokenEvent{ - ACLToken: &structs.ACLToken{ - SecretID: "foo", - }, - }, + Topic: structs.TopicACLToken, + Type: structs.TypeACLTokenDeleted, + Payload: structs.NewACLTokenEvent("foo", &structs.ACLToken{}), } publisher.Publish(&structs.Events{Index: 100, Events: []structs.Event{aclEvent}}) diff --git a/nomad/structs/event.go b/nomad/structs/event.go index b18ed388bfa..b66ef5cdc69 100644 --- a/nomad/structs/event.go +++ b/nomad/structs/event.go @@ -120,6 +120,19 @@ type NodeStreamEvent struct { type ACLTokenEvent struct { ACLToken *ACLToken + secretID string +} + +// NewACLTokenEvent takes a secretID and token and creates a new ACLTokenEvent. +func NewACLTokenEvent(secretID string, token *ACLToken) *ACLTokenEvent { + return &ACLTokenEvent{ + ACLToken: token, + secretID: secretID, + } +} + +func (a *ACLTokenEvent) SecretID() string { + return a.secretID } type ACLPolicyEvent struct { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 6d21c694df0..3aa91a6562a 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -10813,6 +10813,18 @@ type ACLToken struct { ModifyIndex uint64 } +func (a *ACLToken) Copy() *ACLToken { + c := new(ACLToken) + *c = *a + + c.Policies = make([]string, len(a.Policies)) + copy(c.Policies, a.Policies) + c.Hash = make([]byte, len(a.Hash)) + copy(c.Hash, a.Hash) + + return c +} + var ( // AnonymousACLToken is used no SecretID is provided, and the // request is made anonymously.