Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Events/acl events #9595

Merged
merged 6 commits into from
Dec 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
IMPROVEMENTS:
* consul/connect: interpolate the connect, service meta, and service canary meta blocks with the task environment [[GH-9586](https://github.com/hashicorp/nomad/pull/9586)]

BUG FIXES:
* core: Fixed a bug where ACLToken and ACLPolicy changes were ignored by the event stream [[GH-9595](https://github.com/hashicorp/nomad/issues/9595)]

## 1.0.0 (December 8, 2020)

FEATURES:
Expand Down
146 changes: 146 additions & 0 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/hashicorp/raft"
Expand Down Expand Up @@ -3276,3 +3277,148 @@ func TestFSM_SnapshotRestore_Namespaces(t *testing.T) {
t.Fatalf("bad: \n%#v\n%#v", out2, ns2)
}
}

func TestFSM_ACLEvents(t *testing.T) {
t.Parallel()

cases := []struct {
desc string
setupfn func(t *testing.T, fsm *nomadFSM)
raftReq func(t *testing.T) []byte
reqTopic structs.Topic
eventfn func(t *testing.T, e []structs.Event)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like how this test got structured with these testing funcs as test cases. 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I think this one is right in the sweet spot for it being legible. table tests with huge tables and small test case runs can get hard to read 🤷‍♂️

}{
{
desc: "ACLToken upserted",
raftReq: func(t *testing.T) []byte {
req := structs.ACLTokenUpsertRequest{
Tokens: []*structs.ACLToken{mock.ACLToken()},
}
buf, err := structs.Encode(structs.ACLTokenUpsertRequestType, req)
require.NoError(t, err)
return buf
},
reqTopic: structs.TopicACLToken,
eventfn: func(t *testing.T, e []structs.Event) {
require.Len(t, e, 1)
require.Equal(t, e[0].Topic, structs.TopicACLToken)
require.Empty(t, e[0].Payload.(*structs.ACLTokenEvent).ACLToken.SecretID)
require.Equal(t, e[0].Type, structs.TypeACLTokenUpserted)
},
},
{
desc: "ACLToken deleted",
setupfn: func(t *testing.T, fsm *nomadFSM) {
token := mock.ACLToken()
token.SecretID = "26be01d3-df3a-45e9-9f49-4487a3dc3496"
token.AccessorID = "b971acba-bbe5-4274-bdfa-8bb1f542a8c1"

require.NoError(t,
fsm.State().UpsertACLTokens(
structs.MsgTypeTestSetup, 10, []*structs.ACLToken{token}))
},
raftReq: func(t *testing.T) []byte {
req := structs.ACLTokenDeleteRequest{
AccessorIDs: []string{"b971acba-bbe5-4274-bdfa-8bb1f542a8c1"},
}
buf, err := structs.Encode(structs.ACLTokenDeleteRequestType, req)
require.NoError(t, err)
return buf
},
reqTopic: structs.TopicACLToken,
eventfn: func(t *testing.T, e []structs.Event) {
require.Len(t, e, 1)
require.Equal(t, e[0].Topic, structs.TopicACLToken)
require.Empty(t, e[0].Payload.(*structs.ACLTokenEvent).ACLToken.SecretID)
require.Equal(t, e[0].Type, structs.TypeACLTokenDeleted)
},
},
{
desc: "ACLPolicy upserted",
raftReq: func(t *testing.T) []byte {
req := structs.ACLPolicyUpsertRequest{
Policies: []*structs.ACLPolicy{mock.ACLPolicy()},
}
buf, err := structs.Encode(structs.ACLPolicyUpsertRequestType, req)
require.NoError(t, err)
return buf
},
reqTopic: structs.TopicACLPolicy,
eventfn: func(t *testing.T, e []structs.Event) {
require.Len(t, e, 1)
require.Equal(t, e[0].Topic, structs.TopicACLPolicy)
require.Equal(t, e[0].Type, structs.TypeACLPolicyUpserted)
},
},
{
desc: "ACLPolicy deleted",
setupfn: func(t *testing.T, fsm *nomadFSM) {
policy := mock.ACLPolicy()
policy.Name = "some-policy"

require.NoError(t,
fsm.State().UpsertACLPolicies(
structs.MsgTypeTestSetup, 10, []*structs.ACLPolicy{policy}))
},
raftReq: func(t *testing.T) []byte {
req := structs.ACLPolicyDeleteRequest{
Names: []string{"some-policy"},
}
buf, err := structs.Encode(structs.ACLPolicyDeleteRequestType, req)
require.NoError(t, err)
return buf
},
reqTopic: structs.TopicACLPolicy,
eventfn: func(t *testing.T, e []structs.Event) {
require.Len(t, e, 1)
require.Equal(t, e[0].Topic, structs.TopicACLPolicy)
require.Equal(t, e[0].Type, structs.TypeACLPolicyDeleted)
},
},
}

for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
fsm := testFSM(t)

// Setup any state necessary
if tc.setupfn != nil {
tc.setupfn(t, fsm)
}

// Apply the log
resp := fsm.Apply(makeLog(tc.raftReq(t)))
require.Nil(t, resp)

broker, err := fsm.State().EventBroker()
require.NoError(t, err)

subReq := &stream.SubscribeRequest{
Topics: map[structs.Topic][]string{
tc.reqTopic: {"*"},
},
}

sub, err := broker.Subscribe(subReq)
require.NoError(t, err)

var events []structs.Event

testutil.WaitForResult(func() (bool, error) {
out, err := sub.NextNoBlock()
require.NoError(t, err)

if out == nil {
return false, fmt.Errorf("expected events got nil")
}

events = out
return true, nil
}, func(err error) {
require.Fail(t, err.Error())
})

tc.eventfn(t, events)
})
}
}
22 changes: 10 additions & 12 deletions nomad/state/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,11 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
if !ok {
return structs.Event{}, false
}

return structs.Event{
Topic: structs.TopicACLToken,
Key: before.AccessorID,
Payload: structs.ACLTokenEvent{
ACLToken: before,
},
Topic: structs.TopicACLToken,
Key: before.AccessorID,
Payload: structs.NewACLTokenEvent(before),
}, true
case "acl_policy":
before, ok := change.Before.(*structs.ACLPolicy)
Expand All @@ -71,7 +70,7 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
return structs.Event{
Topic: structs.TopicACLPolicy,
Key: before.Name,
Payload: structs.ACLPolicyEvent{
Payload: &structs.ACLPolicyEvent{
ACLPolicy: before,
},
}, true
Expand Down Expand Up @@ -102,12 +101,11 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
if !ok {
return structs.Event{}, false
}

return structs.Event{
Topic: structs.TopicACLToken,
Key: after.AccessorID,
Payload: structs.ACLTokenEvent{
ACLToken: after,
},
Topic: structs.TopicACLToken,
Key: after.AccessorID,
Payload: structs.NewACLTokenEvent(after),
}, true
case "acl_policy":
after, ok := change.After.(*structs.ACLPolicy)
Expand All @@ -117,7 +115,7 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
return structs.Event{
Topic: structs.TopicACLPolicy,
Key: after.Name,
Payload: structs.ACLPolicyEvent{
Payload: &structs.ACLPolicyEvent{
ACLPolicy: after,
},
}, true
Expand Down
53 changes: 53 additions & 0 deletions nomad/state/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,59 @@ func TestEventFromChange_SingleEventPerTable(t *testing.T) {
require.Equal(t, out.Events[0].Type, structs.TypeJobRegistered)
}

func TestEventFromChange_ACLTokenSecretID(t *testing.T) {
t.Parallel()
s := TestStateStoreCfg(t, TestStateStorePublisher(t))
defer s.StopEventBroker()

token := mock.ACLToken()
require.NotEmpty(t, token.SecretID)

// Create
changes := Changes{
Index: 100,
MsgType: structs.NodeRegisterRequestType,
Changes: memdb.Changes{
{
Table: "acl_token",
Before: nil,
After: token,
},
},
}

out := eventsFromChanges(s.db.ReadTxn(), changes)
require.Len(t, out.Events, 1)
// Ensure original value not altered
require.NotEmpty(t, token.SecretID)

aclTokenEvent, ok := out.Events[0].Payload.(*structs.ACLTokenEvent)
require.True(t, ok)
require.Empty(t, aclTokenEvent.ACLToken.SecretID)

require.Equal(t, token.SecretID, aclTokenEvent.SecretID())

// Delete
changes = Changes{
Index: 100,
MsgType: structs.NodeDeregisterRequestType,
Changes: memdb.Changes{
{
Table: "acl_token",
Before: token,
After: nil,
},
},
}

out2 := eventsFromChanges(s.db.ReadTxn(), changes)
require.Len(t, out2.Events, 1)

tokenEvent2, ok := out2.Events[0].Payload.(*structs.ACLTokenEvent)
require.True(t, ok)
require.Empty(t, tokenEvent2.ACLToken.SecretID)
}

// TestEventFromChange_NodeSecretID ensures that a node's secret ID is not
// included in a node event
func TestEventFromChange_NodeSecretID(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5028,7 +5028,7 @@ func (s *StateStore) updatePluginWithJobSummary(index uint64, summary *structs.J

// UpsertACLPolicies is used to create or update a set of ACL policies
func (s *StateStore) UpsertACLPolicies(msgType structs.MessageType, index uint64, policies []*structs.ACLPolicy) error {
txn := s.db.WriteTxn(index)
txn := s.db.WriteTxnMsgT(msgType, index)
defer txn.Abort()

for _, policy := range policies {
Expand Down Expand Up @@ -5128,7 +5128,7 @@ func (s *StateStore) ACLPolicies(ws memdb.WatchSet) (memdb.ResultIterator, error

// UpsertACLTokens is used to create or update a set of ACL tokens
func (s *StateStore) UpsertACLTokens(msgType structs.MessageType, index uint64, tokens []*structs.ACLToken) error {
txn := s.db.WriteTxn(index)
txn := s.db.WriteTxnMsgT(msgType, index)
defer txn.Abort()

for _, token := range tokens {
Expand Down
16 changes: 10 additions & 6 deletions nomad/stream/event_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ type EventBrokerCfg struct {

type EventBroker struct {
// mu protects subscriptions
mu sync.Mutex

mu sync.Mutex
subscriptions *subscriptions

// eventBuf stores a configurable amount of events in memory
Expand Down Expand Up @@ -189,8 +188,8 @@ func (e *EventBroker) handleACLUpdates(ctx context.Context) {
return
case update := <-e.aclCh:
switch payload := update.Payload.(type) {
case structs.ACLTokenEvent:
tokenSecretID := payload.ACLToken.SecretID
case *structs.ACLTokenEvent:
tokenSecretID := payload.SecretID()

// Token was deleted
if update.Type == structs.TypeACLTokenDeleted {
Expand All @@ -214,7 +213,7 @@ func (e *EventBroker) handleACLUpdates(ctx context.Context) {
return !aclAllowsSubscription(aclObj, sub.req)
})

case structs.ACLPolicyEvent:
case *structs.ACLPolicyEvent:
// Re-evaluate each subscriptions permissions since a policy
// change may or may not affect the subscription
e.checkSubscriptionsAgainstPolicyChange()
Expand All @@ -240,9 +239,14 @@ func (e *EventBroker) checkSubscriptionsAgainstPolicyChange() {

aclSnapshot := e.aclDelegate.TokenProvider()
for tokenSecretID := range e.subscriptions.byToken {
// if tokenSecretID is empty ACLs were disabled at time of subscribing
if tokenSecretID == "" {
continue
}

aclObj, err := aclObjFromSnapshotForTokenSecretID(aclSnapshot, e.aclCache, tokenSecretID)
if err != nil || aclObj == nil {
e.logger.Error("failed resolving ACL for secretID, closing subscriptions", "error", err)
e.logger.Debug("failed resolving ACL for secretID, closing subscriptions", "error", err)
e.subscriptions.closeSubscriptionsForTokens([]string{tokenSecretID})
continue
}
Expand Down
Loading