diff --git a/CHANGELOG.md b/CHANGELOG.md index 0b557184768..20cb1f38193 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,9 @@ IMPROVEMENTS: * consul/connect: interpolate the connect, service meta, and service canary meta blocks with the task environment [[GH-9586](https://github.com/hashicorp/nomad/pull/9586)] +BUG FIXES: + * core: Fixed a bug where ACLToken and ACLPolicy changes were ignored by the event stream [[GH-9595](https://github.com/hashicorp/nomad/issues/9595)] + ## 1.0.0 (December 8, 2020) FEATURES: diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 3a66041ceaf..20ecf18f32d 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -15,6 +15,7 @@ import ( "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/state" + "github.com/hashicorp/nomad/nomad/stream" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" "github.com/hashicorp/raft" @@ -3276,3 +3277,148 @@ func TestFSM_SnapshotRestore_Namespaces(t *testing.T) { t.Fatalf("bad: \n%#v\n%#v", out2, ns2) } } + +func TestFSM_ACLEvents(t *testing.T) { + t.Parallel() + + cases := []struct { + desc string + setupfn func(t *testing.T, fsm *nomadFSM) + raftReq func(t *testing.T) []byte + reqTopic structs.Topic + eventfn func(t *testing.T, e []structs.Event) + }{ + { + desc: "ACLToken upserted", + raftReq: func(t *testing.T) []byte { + req := structs.ACLTokenUpsertRequest{ + Tokens: []*structs.ACLToken{mock.ACLToken()}, + } + buf, err := structs.Encode(structs.ACLTokenUpsertRequestType, req) + require.NoError(t, err) + return buf + }, + reqTopic: structs.TopicACLToken, + eventfn: func(t *testing.T, e []structs.Event) { + require.Len(t, e, 1) + require.Equal(t, e[0].Topic, structs.TopicACLToken) + require.Empty(t, e[0].Payload.(*structs.ACLTokenEvent).ACLToken.SecretID) + require.Equal(t, e[0].Type, structs.TypeACLTokenUpserted) + }, + }, + { + desc: "ACLToken deleted", + setupfn: func(t *testing.T, fsm *nomadFSM) { + token := mock.ACLToken() + token.SecretID = "26be01d3-df3a-45e9-9f49-4487a3dc3496" + token.AccessorID = "b971acba-bbe5-4274-bdfa-8bb1f542a8c1" + + require.NoError(t, + fsm.State().UpsertACLTokens( + structs.MsgTypeTestSetup, 10, []*structs.ACLToken{token})) + }, + raftReq: func(t *testing.T) []byte { + req := structs.ACLTokenDeleteRequest{ + AccessorIDs: []string{"b971acba-bbe5-4274-bdfa-8bb1f542a8c1"}, + } + buf, err := structs.Encode(structs.ACLTokenDeleteRequestType, req) + require.NoError(t, err) + return buf + }, + reqTopic: structs.TopicACLToken, + eventfn: func(t *testing.T, e []structs.Event) { + require.Len(t, e, 1) + require.Equal(t, e[0].Topic, structs.TopicACLToken) + require.Empty(t, e[0].Payload.(*structs.ACLTokenEvent).ACLToken.SecretID) + require.Equal(t, e[0].Type, structs.TypeACLTokenDeleted) + }, + }, + { + desc: "ACLPolicy upserted", + raftReq: func(t *testing.T) []byte { + req := structs.ACLPolicyUpsertRequest{ + Policies: []*structs.ACLPolicy{mock.ACLPolicy()}, + } + buf, err := structs.Encode(structs.ACLPolicyUpsertRequestType, req) + require.NoError(t, err) + return buf + }, + reqTopic: structs.TopicACLPolicy, + eventfn: func(t *testing.T, e []structs.Event) { + require.Len(t, e, 1) + require.Equal(t, e[0].Topic, structs.TopicACLPolicy) + require.Equal(t, e[0].Type, structs.TypeACLPolicyUpserted) + }, + }, + { + desc: "ACLPolicy deleted", + setupfn: func(t *testing.T, fsm *nomadFSM) { + policy := mock.ACLPolicy() + policy.Name = "some-policy" + + require.NoError(t, + fsm.State().UpsertACLPolicies( + structs.MsgTypeTestSetup, 10, []*structs.ACLPolicy{policy})) + }, + raftReq: func(t *testing.T) []byte { + req := structs.ACLPolicyDeleteRequest{ + Names: []string{"some-policy"}, + } + buf, err := structs.Encode(structs.ACLPolicyDeleteRequestType, req) + require.NoError(t, err) + return buf + }, + reqTopic: structs.TopicACLPolicy, + eventfn: func(t *testing.T, e []structs.Event) { + require.Len(t, e, 1) + require.Equal(t, e[0].Topic, structs.TopicACLPolicy) + require.Equal(t, e[0].Type, structs.TypeACLPolicyDeleted) + }, + }, + } + + for _, tc := range cases { + t.Run(tc.desc, func(t *testing.T) { + fsm := testFSM(t) + + // Setup any state necessary + if tc.setupfn != nil { + tc.setupfn(t, fsm) + } + + // Apply the log + resp := fsm.Apply(makeLog(tc.raftReq(t))) + require.Nil(t, resp) + + broker, err := fsm.State().EventBroker() + require.NoError(t, err) + + subReq := &stream.SubscribeRequest{ + Topics: map[structs.Topic][]string{ + tc.reqTopic: {"*"}, + }, + } + + sub, err := broker.Subscribe(subReq) + require.NoError(t, err) + + var events []structs.Event + + testutil.WaitForResult(func() (bool, error) { + out, err := sub.NextNoBlock() + require.NoError(t, err) + + if out == nil { + return false, fmt.Errorf("expected events got nil") + } + + events = out + return true, nil + }, func(err error) { + require.Fail(t, err.Error()) + }) + + tc.eventfn(t, events) + }) + } +} diff --git a/nomad/state/events.go b/nomad/state/events.go index 96fa7bfb893..ab4a086f598 100644 --- a/nomad/state/events.go +++ b/nomad/state/events.go @@ -56,12 +56,11 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { if !ok { return structs.Event{}, false } + return structs.Event{ - Topic: structs.TopicACLToken, - Key: before.AccessorID, - Payload: structs.ACLTokenEvent{ - ACLToken: before, - }, + Topic: structs.TopicACLToken, + Key: before.AccessorID, + Payload: structs.NewACLTokenEvent(before), }, true case "acl_policy": before, ok := change.Before.(*structs.ACLPolicy) @@ -71,7 +70,7 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { return structs.Event{ Topic: structs.TopicACLPolicy, Key: before.Name, - Payload: structs.ACLPolicyEvent{ + Payload: &structs.ACLPolicyEvent{ ACLPolicy: before, }, }, true @@ -102,12 +101,11 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { if !ok { return structs.Event{}, false } + return structs.Event{ - Topic: structs.TopicACLToken, - Key: after.AccessorID, - Payload: structs.ACLTokenEvent{ - ACLToken: after, - }, + Topic: structs.TopicACLToken, + Key: after.AccessorID, + Payload: structs.NewACLTokenEvent(after), }, true case "acl_policy": after, ok := change.After.(*structs.ACLPolicy) @@ -117,7 +115,7 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { return structs.Event{ Topic: structs.TopicACLPolicy, Key: after.Name, - Payload: structs.ACLPolicyEvent{ + Payload: &structs.ACLPolicyEvent{ ACLPolicy: after, }, }, true diff --git a/nomad/state/events_test.go b/nomad/state/events_test.go index 919bbcbcf84..3eba439d901 100644 --- a/nomad/state/events_test.go +++ b/nomad/state/events_test.go @@ -41,6 +41,59 @@ func TestEventFromChange_SingleEventPerTable(t *testing.T) { require.Equal(t, out.Events[0].Type, structs.TypeJobRegistered) } +func TestEventFromChange_ACLTokenSecretID(t *testing.T) { + t.Parallel() + s := TestStateStoreCfg(t, TestStateStorePublisher(t)) + defer s.StopEventBroker() + + token := mock.ACLToken() + require.NotEmpty(t, token.SecretID) + + // Create + changes := Changes{ + Index: 100, + MsgType: structs.NodeRegisterRequestType, + Changes: memdb.Changes{ + { + Table: "acl_token", + Before: nil, + After: token, + }, + }, + } + + out := eventsFromChanges(s.db.ReadTxn(), changes) + require.Len(t, out.Events, 1) + // Ensure original value not altered + require.NotEmpty(t, token.SecretID) + + aclTokenEvent, ok := out.Events[0].Payload.(*structs.ACLTokenEvent) + require.True(t, ok) + require.Empty(t, aclTokenEvent.ACLToken.SecretID) + + require.Equal(t, token.SecretID, aclTokenEvent.SecretID()) + + // Delete + changes = Changes{ + Index: 100, + MsgType: structs.NodeDeregisterRequestType, + Changes: memdb.Changes{ + { + Table: "acl_token", + Before: token, + After: nil, + }, + }, + } + + out2 := eventsFromChanges(s.db.ReadTxn(), changes) + require.Len(t, out2.Events, 1) + + tokenEvent2, ok := out2.Events[0].Payload.(*structs.ACLTokenEvent) + require.True(t, ok) + require.Empty(t, tokenEvent2.ACLToken.SecretID) +} + // TestEventFromChange_NodeSecretID ensures that a node's secret ID is not // included in a node event func TestEventFromChange_NodeSecretID(t *testing.T) { diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 93a8571a549..1c749827327 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -5028,7 +5028,7 @@ func (s *StateStore) updatePluginWithJobSummary(index uint64, summary *structs.J // UpsertACLPolicies is used to create or update a set of ACL policies func (s *StateStore) UpsertACLPolicies(msgType structs.MessageType, index uint64, policies []*structs.ACLPolicy) error { - txn := s.db.WriteTxn(index) + txn := s.db.WriteTxnMsgT(msgType, index) defer txn.Abort() for _, policy := range policies { @@ -5128,7 +5128,7 @@ func (s *StateStore) ACLPolicies(ws memdb.WatchSet) (memdb.ResultIterator, error // UpsertACLTokens is used to create or update a set of ACL tokens func (s *StateStore) UpsertACLTokens(msgType structs.MessageType, index uint64, tokens []*structs.ACLToken) error { - txn := s.db.WriteTxn(index) + txn := s.db.WriteTxnMsgT(msgType, index) defer txn.Abort() for _, token := range tokens { diff --git a/nomad/stream/event_broker.go b/nomad/stream/event_broker.go index 1625d976c7f..7633ef348b0 100644 --- a/nomad/stream/event_broker.go +++ b/nomad/stream/event_broker.go @@ -29,8 +29,7 @@ type EventBrokerCfg struct { type EventBroker struct { // mu protects subscriptions - mu sync.Mutex - + mu sync.Mutex subscriptions *subscriptions // eventBuf stores a configurable amount of events in memory @@ -189,8 +188,8 @@ func (e *EventBroker) handleACLUpdates(ctx context.Context) { return case update := <-e.aclCh: switch payload := update.Payload.(type) { - case structs.ACLTokenEvent: - tokenSecretID := payload.ACLToken.SecretID + case *structs.ACLTokenEvent: + tokenSecretID := payload.SecretID() // Token was deleted if update.Type == structs.TypeACLTokenDeleted { @@ -214,7 +213,7 @@ func (e *EventBroker) handleACLUpdates(ctx context.Context) { return !aclAllowsSubscription(aclObj, sub.req) }) - case structs.ACLPolicyEvent: + case *structs.ACLPolicyEvent: // Re-evaluate each subscriptions permissions since a policy // change may or may not affect the subscription e.checkSubscriptionsAgainstPolicyChange() @@ -240,9 +239,14 @@ func (e *EventBroker) checkSubscriptionsAgainstPolicyChange() { aclSnapshot := e.aclDelegate.TokenProvider() for tokenSecretID := range e.subscriptions.byToken { + // if tokenSecretID is empty ACLs were disabled at time of subscribing + if tokenSecretID == "" { + continue + } + aclObj, err := aclObjFromSnapshotForTokenSecretID(aclSnapshot, e.aclCache, tokenSecretID) if err != nil || aclObj == nil { - e.logger.Error("failed resolving ACL for secretID, closing subscriptions", "error", err) + e.logger.Debug("failed resolving ACL for secretID, closing subscriptions", "error", err) e.subscriptions.closeSubscriptionsForTokens([]string{tokenSecretID}) continue } diff --git a/nomad/stream/event_broker_test.go b/nomad/stream/event_broker_test.go index 633d6fda7dd..0e34247106f 100644 --- a/nomad/stream/event_broker_test.go +++ b/nomad/stream/event_broker_test.go @@ -116,7 +116,7 @@ func TestEventBroker_EmptyReqToken_DistinctSubscriptions(t *testing.T) { require.Equal(t, subscriptionStateOpen, atomic.LoadUint32(&sub2.state)) } -func TestEventBroker_handleACLUpdates_tokendeleted(t *testing.T) { +func TestEventBroker_handleACLUpdates_TokenDeleted(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) @@ -133,13 +133,9 @@ func TestEventBroker_handleACLUpdates_tokendeleted(t *testing.T) { defer sub1.Unsubscribe() aclEvent := structs.Event{ - Topic: structs.TopicACLToken, - Type: structs.TypeACLTokenDeleted, - Payload: structs.ACLTokenEvent{ - ACLToken: &structs.ACLToken{ - SecretID: "foo", - }, - }, + Topic: structs.TopicACLToken, + Type: structs.TypeACLTokenDeleted, + Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: "foo"}), } publisher.Publish(&structs.Events{Index: 100, Events: []structs.Event{aclEvent}}) @@ -209,13 +205,9 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) { }, }, policyEvent: structs.Event{ - Topic: structs.TopicACLToken, - Type: structs.TypeACLTokenUpserted, - Payload: structs.ACLTokenEvent{ - ACLToken: &structs.ACLToken{ - SecretID: secretID, - }, - }, + Topic: structs.TopicACLToken, + Type: structs.TypeACLTokenUpserted, + Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}), }, }, { @@ -233,13 +225,9 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) { }, }, policyEvent: structs.Event{ - Topic: structs.TopicACLToken, - Type: structs.TypeACLTokenUpserted, - Payload: structs.ACLTokenEvent{ - ACLToken: &structs.ACLToken{ - SecretID: secretID, - }, - }, + Topic: structs.TopicACLToken, + Type: structs.TypeACLTokenUpserted, + Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}), }, }, { @@ -257,13 +245,9 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) { }, }, policyEvent: structs.Event{ - Topic: structs.TopicACLToken, - Type: structs.TypeACLTokenUpserted, - Payload: structs.ACLTokenEvent{ - ACLToken: &structs.ACLToken{ - SecretID: secretID, - }, - }, + Topic: structs.TopicACLToken, + Type: structs.TypeACLTokenUpserted, + Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}), }, }, { @@ -281,13 +265,9 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) { }, }, policyEvent: structs.Event{ - Topic: structs.TopicACLToken, - Type: structs.TypeACLTokenUpserted, - Payload: structs.ACLTokenEvent{ - ACLToken: &structs.ACLToken{ - SecretID: secretID, - }, - }, + Topic: structs.TopicACLToken, + Type: structs.TypeACLTokenUpserted, + Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}), }, }, { @@ -305,13 +285,9 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) { }, }, policyEvent: structs.Event{ - Topic: structs.TopicACLToken, - Type: structs.TypeACLTokenUpserted, - Payload: structs.ACLTokenEvent{ - ACLToken: &structs.ACLToken{ - SecretID: secretID, - }, - }, + Topic: structs.TopicACLToken, + Type: structs.TypeACLTokenUpserted, + Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}), }, }, { @@ -329,13 +305,9 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) { }, }, policyEvent: structs.Event{ - Topic: structs.TopicACLToken, - Type: structs.TypeACLTokenUpserted, - Payload: structs.ACLTokenEvent{ - ACLToken: &structs.ACLToken{ - SecretID: secretID, - }, - }, + Topic: structs.TopicACLToken, + Type: structs.TypeACLTokenUpserted, + Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}), }, }, { @@ -353,13 +325,9 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) { }, }, policyEvent: structs.Event{ - Topic: structs.TopicACLToken, - Type: structs.TypeACLTokenUpserted, - Payload: structs.ACLTokenEvent{ - ACLToken: &structs.ACLToken{ - SecretID: secretID, - }, - }, + Topic: structs.TopicACLToken, + Type: structs.TypeACLTokenUpserted, + Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}), }, }, { @@ -377,13 +345,9 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) { }, }, policyEvent: structs.Event{ - Topic: structs.TopicACLToken, - Type: structs.TypeACLTokenUpserted, - Payload: structs.ACLTokenEvent{ - ACLToken: &structs.ACLToken{ - SecretID: secretID, - }, - }, + Topic: structs.TopicACLToken, + Type: structs.TypeACLTokenUpserted, + Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}), }, }, { @@ -400,13 +364,9 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) { }, }, policyEvent: structs.Event{ - Topic: structs.TopicACLToken, - Type: structs.TypeACLTokenUpserted, - Payload: structs.ACLTokenEvent{ - ACLToken: &structs.ACLToken{ - SecretID: secretID, - }, - }, + Topic: structs.TopicACLToken, + Type: structs.TypeACLTokenUpserted, + Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}), }, }, { @@ -426,7 +386,7 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) { policyEvent: structs.Event{ Topic: structs.TopicACLPolicy, Type: structs.TypeACLPolicyUpserted, - Payload: structs.ACLPolicyEvent{ + Payload: &structs.ACLPolicyEvent{ ACLPolicy: &structs.ACLPolicy{ Name: "some-policy", }, @@ -450,7 +410,7 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) { policyEvent: structs.Event{ Topic: structs.TopicACLPolicy, Type: structs.TypeACLPolicyUpserted, - Payload: structs.ACLPolicyEvent{ + Payload: &structs.ACLPolicyEvent{ ACLPolicy: &structs.ACLPolicy{ Name: "some-policy", }, @@ -474,7 +434,7 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) { policyEvent: structs.Event{ Topic: structs.TopicACLPolicy, Type: structs.TypeACLPolicyDeleted, - Payload: structs.ACLPolicyEvent{ + Payload: &structs.ACLPolicyEvent{ ACLPolicy: &structs.ACLPolicy{ Name: "some-policy", }, diff --git a/nomad/structs/event.go b/nomad/structs/event.go index b18ed388bfa..79cf0bebc37 100644 --- a/nomad/structs/event.go +++ b/nomad/structs/event.go @@ -120,6 +120,23 @@ type NodeStreamEvent struct { type ACLTokenEvent struct { ACLToken *ACLToken + secretID string +} + +// NewACLTokenEvent takes a token and creates a new ACLTokenEvent. It creates +// a copy of the passed in ACLToken and empties out the copied tokens SecretID +func NewACLTokenEvent(token *ACLToken) *ACLTokenEvent { + c := token.Copy() + c.SecretID = "" + + return &ACLTokenEvent{ + ACLToken: c, + secretID: token.SecretID, + } +} + +func (a *ACLTokenEvent) SecretID() string { + return a.secretID } type ACLPolicyEvent struct { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 74dd1304a07..9cbfe22b8b8 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -10813,6 +10813,18 @@ type ACLToken struct { ModifyIndex uint64 } +func (a *ACLToken) Copy() *ACLToken { + c := new(ACLToken) + *c = *a + + c.Policies = make([]string, len(a.Policies)) + copy(c.Policies, a.Policies) + c.Hash = make([]byte, len(a.Hash)) + copy(c.Hash, a.Hash) + + return c +} + var ( // AnonymousACLToken is used no SecretID is provided, and the // request is made anonymously.