From 1ce744631db6378db15a8c9aa2f7d264ea6abc45 Mon Sep 17 00:00:00 2001 From: Drew Bailey <2614075+drewbailey@users.noreply.github.com> Date: Thu, 3 Dec 2020 10:43:40 -0500 Subject: [PATCH 1/6] fix acl event creation --- nomad/state/state_store.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 93a8571a549..1c749827327 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -5028,7 +5028,7 @@ func (s *StateStore) updatePluginWithJobSummary(index uint64, summary *structs.J // UpsertACLPolicies is used to create or update a set of ACL policies func (s *StateStore) UpsertACLPolicies(msgType structs.MessageType, index uint64, policies []*structs.ACLPolicy) error { - txn := s.db.WriteTxn(index) + txn := s.db.WriteTxnMsgT(msgType, index) defer txn.Abort() for _, policy := range policies { @@ -5128,7 +5128,7 @@ func (s *StateStore) ACLPolicies(ws memdb.WatchSet) (memdb.ResultIterator, error // UpsertACLTokens is used to create or update a set of ACL tokens func (s *StateStore) UpsertACLTokens(msgType structs.MessageType, index uint64, tokens []*structs.ACLToken) error { - txn := s.db.WriteTxn(index) + txn := s.db.WriteTxnMsgT(msgType, index) defer txn.Abort() for _, token := range tokens { From be313dafb5946cf1d6b648dccee4ec0f802e186a Mon Sep 17 00:00:00 2001 From: Drew Bailey <2614075+drewbailey@users.noreply.github.com> Date: Wed, 9 Dec 2020 14:42:21 -0500 Subject: [PATCH 2/6] allow way to access secretID without exposing it to stream test that values are omitted test event creation test acl events payloads are pointers fix failing tests, do all security steps inside constructor --- nomad/fsm_test.go | 155 ++++++++++++++++++++++++++++++ nomad/state/events.go | 22 ++--- nomad/state/events_test.go | 53 ++++++++++ nomad/stream/event_broker.go | 9 +- nomad/stream/event_broker_test.go | 108 +++++++-------------- nomad/structs/event.go | 17 ++++ nomad/structs/structs.go | 12 +++ 7 files changed, 285 insertions(+), 91 deletions(-) diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 3a66041ceaf..78ce17307cd 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -2,6 +2,7 @@ package nomad import ( "bytes" + "context" "fmt" "reflect" "strings" @@ -15,6 +16,7 @@ import ( "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/state" + "github.com/hashicorp/nomad/nomad/stream" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" "github.com/hashicorp/raft" @@ -3276,3 +3278,156 @@ func TestFSM_SnapshotRestore_Namespaces(t *testing.T) { t.Fatalf("bad: \n%#v\n%#v", out2, ns2) } } + +func TestFSM_ACLEvents_ACLToken(t *testing.T) { + t.Parallel() + + cases := []struct { + desc string + setupfn func(t *testing.T, fsm *nomadFSM) + raftReq func(t *testing.T) []byte + reqTopic structs.Topic + eventfn func(t *testing.T, e []structs.Event) + }{ + { + desc: "ACLToken upserted", + raftReq: func(t *testing.T) []byte { + req := structs.ACLTokenUpsertRequest{ + Tokens: []*structs.ACLToken{mock.ACLToken()}, + } + buf, err := structs.Encode(structs.ACLTokenUpsertRequestType, req) + require.NoError(t, err) + return buf + }, + reqTopic: structs.TopicACLToken, + eventfn: func(t *testing.T, e []structs.Event) { + require.Len(t, e, 1) + require.Equal(t, e[0].Topic, structs.TopicACLToken) + require.Empty(t, e[0].Payload.(*structs.ACLTokenEvent).ACLToken.SecretID) + require.Equal(t, e[0].Type, structs.TypeACLTokenUpserted) + }, + }, + { + desc: "ACLToken deleted", + setupfn: func(t *testing.T, fsm *nomadFSM) { + token := mock.ACLToken() + token.SecretID = "26be01d3-df3a-45e9-9f49-4487a3dc3496" + token.AccessorID = "b971acba-bbe5-4274-bdfa-8bb1f542a8c1" + + require.NoError(t, + fsm.State().UpsertACLTokens( + structs.MsgTypeTestSetup, 10, []*structs.ACLToken{token})) + }, + raftReq: func(t *testing.T) []byte { + req := structs.ACLTokenDeleteRequest{ + AccessorIDs: []string{"b971acba-bbe5-4274-bdfa-8bb1f542a8c1"}, + } + buf, err := structs.Encode(structs.ACLTokenDeleteRequestType, req) + require.NoError(t, err) + return buf + }, + reqTopic: structs.TopicACLToken, + eventfn: func(t *testing.T, e []structs.Event) { + require.Len(t, e, 1) + require.Equal(t, e[0].Topic, structs.TopicACLToken) + require.Empty(t, e[0].Payload.(*structs.ACLTokenEvent).ACLToken.SecretID) + require.Equal(t, e[0].Type, structs.TypeACLTokenDeleted) + }, + }, + { + desc: "ACLPolicy upserted", + raftReq: func(t *testing.T) []byte { + req := structs.ACLPolicyUpsertRequest{ + Policies: []*structs.ACLPolicy{mock.ACLPolicy()}, + } + buf, err := structs.Encode(structs.ACLPolicyUpsertRequestType, req) + require.NoError(t, err) + return buf + }, + reqTopic: structs.TopicACLPolicy, + eventfn: func(t *testing.T, e []structs.Event) { + require.Len(t, e, 1) + require.Equal(t, e[0].Topic, structs.TopicACLPolicy) + require.Equal(t, e[0].Type, structs.TypeACLPolicyUpserted) + }, + }, + { + desc: "ACLPolicy deleted", + setupfn: func(t *testing.T, fsm *nomadFSM) { + policy := mock.ACLPolicy() + policy.Name = "some-policy" + + require.NoError(t, + fsm.State().UpsertACLPolicies( + structs.MsgTypeTestSetup, 10, []*structs.ACLPolicy{policy})) + }, + raftReq: func(t *testing.T) []byte { + req := structs.ACLPolicyDeleteRequest{ + Names: []string{"some-policy"}, + } + buf, err := structs.Encode(structs.ACLPolicyDeleteRequestType, req) + require.NoError(t, err) + return buf + }, + reqTopic: structs.TopicACLPolicy, + eventfn: func(t *testing.T, e []structs.Event) { + require.Len(t, e, 1) + require.Equal(t, e[0].Topic, structs.TopicACLPolicy) + require.Equal(t, e[0].Type, structs.TypeACLPolicyDeleted) + }, + }, + } + + for _, tc := range cases { + t.Run(tc.desc, func(t *testing.T) { + fsm := testFSM(t) + + // Setup any state necessary + if tc.setupfn != nil { + tc.setupfn(t, fsm) + } + + // Apply the log + resp := fsm.Apply(makeLog(tc.raftReq(t))) + require.Nil(t, resp) + + broker, err := fsm.State().EventBroker() + require.NoError(t, err) + + subReq := &stream.SubscribeRequest{ + Topics: map[structs.Topic][]string{ + tc.reqTopic: {"*"}, + }, + } + + sub, err := broker.Subscribe(subReq) + require.NoError(t, err) + + var events []structs.Event + for { + deadline := time.Duration(testutil.TestMultiplier()*100) * time.Millisecond + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(deadline)) + defer cancel() + out, err := sub.Next(ctx) + if len(out.Events) == 0 { + break + } + + // consume the queue until the deadline has exceeded or until we've + // received more events than expected + if err == context.DeadlineExceeded { + break + } + require.NoError(t, err) + + events = append(events, out.Events...) + + if len(events) >= 1 { + break + } + + } + tc.eventfn(t, events) + }) + } +} diff --git a/nomad/state/events.go b/nomad/state/events.go index 96fa7bfb893..ab4a086f598 100644 --- a/nomad/state/events.go +++ b/nomad/state/events.go @@ -56,12 +56,11 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { if !ok { return structs.Event{}, false } + return structs.Event{ - Topic: structs.TopicACLToken, - Key: before.AccessorID, - Payload: structs.ACLTokenEvent{ - ACLToken: before, - }, + Topic: structs.TopicACLToken, + Key: before.AccessorID, + Payload: structs.NewACLTokenEvent(before), }, true case "acl_policy": before, ok := change.Before.(*structs.ACLPolicy) @@ -71,7 +70,7 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { return structs.Event{ Topic: structs.TopicACLPolicy, Key: before.Name, - Payload: structs.ACLPolicyEvent{ + Payload: &structs.ACLPolicyEvent{ ACLPolicy: before, }, }, true @@ -102,12 +101,11 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { if !ok { return structs.Event{}, false } + return structs.Event{ - Topic: structs.TopicACLToken, - Key: after.AccessorID, - Payload: structs.ACLTokenEvent{ - ACLToken: after, - }, + Topic: structs.TopicACLToken, + Key: after.AccessorID, + Payload: structs.NewACLTokenEvent(after), }, true case "acl_policy": after, ok := change.After.(*structs.ACLPolicy) @@ -117,7 +115,7 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { return structs.Event{ Topic: structs.TopicACLPolicy, Key: after.Name, - Payload: structs.ACLPolicyEvent{ + Payload: &structs.ACLPolicyEvent{ ACLPolicy: after, }, }, true diff --git a/nomad/state/events_test.go b/nomad/state/events_test.go index 919bbcbcf84..3eba439d901 100644 --- a/nomad/state/events_test.go +++ b/nomad/state/events_test.go @@ -41,6 +41,59 @@ func TestEventFromChange_SingleEventPerTable(t *testing.T) { require.Equal(t, out.Events[0].Type, structs.TypeJobRegistered) } +func TestEventFromChange_ACLTokenSecretID(t *testing.T) { + t.Parallel() + s := TestStateStoreCfg(t, TestStateStorePublisher(t)) + defer s.StopEventBroker() + + token := mock.ACLToken() + require.NotEmpty(t, token.SecretID) + + // Create + changes := Changes{ + Index: 100, + MsgType: structs.NodeRegisterRequestType, + Changes: memdb.Changes{ + { + Table: "acl_token", + Before: nil, + After: token, + }, + }, + } + + out := eventsFromChanges(s.db.ReadTxn(), changes) + require.Len(t, out.Events, 1) + // Ensure original value not altered + require.NotEmpty(t, token.SecretID) + + aclTokenEvent, ok := out.Events[0].Payload.(*structs.ACLTokenEvent) + require.True(t, ok) + require.Empty(t, aclTokenEvent.ACLToken.SecretID) + + require.Equal(t, token.SecretID, aclTokenEvent.SecretID()) + + // Delete + changes = Changes{ + Index: 100, + MsgType: structs.NodeDeregisterRequestType, + Changes: memdb.Changes{ + { + Table: "acl_token", + Before: token, + After: nil, + }, + }, + } + + out2 := eventsFromChanges(s.db.ReadTxn(), changes) + require.Len(t, out2.Events, 1) + + tokenEvent2, ok := out2.Events[0].Payload.(*structs.ACLTokenEvent) + require.True(t, ok) + require.Empty(t, tokenEvent2.ACLToken.SecretID) +} + // TestEventFromChange_NodeSecretID ensures that a node's secret ID is not // included in a node event func TestEventFromChange_NodeSecretID(t *testing.T) { diff --git a/nomad/stream/event_broker.go b/nomad/stream/event_broker.go index 1625d976c7f..7d19728eba2 100644 --- a/nomad/stream/event_broker.go +++ b/nomad/stream/event_broker.go @@ -29,8 +29,7 @@ type EventBrokerCfg struct { type EventBroker struct { // mu protects subscriptions - mu sync.Mutex - + mu sync.Mutex subscriptions *subscriptions // eventBuf stores a configurable amount of events in memory @@ -189,8 +188,8 @@ func (e *EventBroker) handleACLUpdates(ctx context.Context) { return case update := <-e.aclCh: switch payload := update.Payload.(type) { - case structs.ACLTokenEvent: - tokenSecretID := payload.ACLToken.SecretID + case *structs.ACLTokenEvent: + tokenSecretID := payload.SecretID() // Token was deleted if update.Type == structs.TypeACLTokenDeleted { @@ -214,7 +213,7 @@ func (e *EventBroker) handleACLUpdates(ctx context.Context) { return !aclAllowsSubscription(aclObj, sub.req) }) - case structs.ACLPolicyEvent: + case *structs.ACLPolicyEvent: // Re-evaluate each subscriptions permissions since a policy // change may or may not affect the subscription e.checkSubscriptionsAgainstPolicyChange() diff --git a/nomad/stream/event_broker_test.go b/nomad/stream/event_broker_test.go index 633d6fda7dd..0e34247106f 100644 --- a/nomad/stream/event_broker_test.go +++ b/nomad/stream/event_broker_test.go @@ -116,7 +116,7 @@ func TestEventBroker_EmptyReqToken_DistinctSubscriptions(t *testing.T) { require.Equal(t, subscriptionStateOpen, atomic.LoadUint32(&sub2.state)) } -func TestEventBroker_handleACLUpdates_tokendeleted(t *testing.T) { +func TestEventBroker_handleACLUpdates_TokenDeleted(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) @@ -133,13 +133,9 @@ func TestEventBroker_handleACLUpdates_tokendeleted(t *testing.T) { defer sub1.Unsubscribe() aclEvent := structs.Event{ - Topic: structs.TopicACLToken, - Type: structs.TypeACLTokenDeleted, - Payload: structs.ACLTokenEvent{ - ACLToken: &structs.ACLToken{ - SecretID: "foo", - }, - }, + Topic: structs.TopicACLToken, + Type: structs.TypeACLTokenDeleted, + Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: "foo"}), } publisher.Publish(&structs.Events{Index: 100, Events: []structs.Event{aclEvent}}) @@ -209,13 +205,9 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) { }, }, policyEvent: structs.Event{ - Topic: structs.TopicACLToken, - Type: structs.TypeACLTokenUpserted, - Payload: structs.ACLTokenEvent{ - ACLToken: &structs.ACLToken{ - SecretID: secretID, - }, - }, + Topic: structs.TopicACLToken, + Type: structs.TypeACLTokenUpserted, + Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}), }, }, { @@ -233,13 +225,9 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) { }, }, policyEvent: structs.Event{ - Topic: structs.TopicACLToken, - Type: structs.TypeACLTokenUpserted, - Payload: structs.ACLTokenEvent{ - ACLToken: &structs.ACLToken{ - SecretID: secretID, - }, - }, + Topic: structs.TopicACLToken, + Type: structs.TypeACLTokenUpserted, + Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}), }, }, { @@ -257,13 +245,9 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) { }, }, policyEvent: structs.Event{ - Topic: structs.TopicACLToken, - Type: structs.TypeACLTokenUpserted, - Payload: structs.ACLTokenEvent{ - ACLToken: &structs.ACLToken{ - SecretID: secretID, - }, - }, + Topic: structs.TopicACLToken, + Type: structs.TypeACLTokenUpserted, + Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}), }, }, { @@ -281,13 +265,9 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) { }, }, policyEvent: structs.Event{ - Topic: structs.TopicACLToken, - Type: structs.TypeACLTokenUpserted, - Payload: structs.ACLTokenEvent{ - ACLToken: &structs.ACLToken{ - SecretID: secretID, - }, - }, + Topic: structs.TopicACLToken, + Type: structs.TypeACLTokenUpserted, + Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}), }, }, { @@ -305,13 +285,9 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) { }, }, policyEvent: structs.Event{ - Topic: structs.TopicACLToken, - Type: structs.TypeACLTokenUpserted, - Payload: structs.ACLTokenEvent{ - ACLToken: &structs.ACLToken{ - SecretID: secretID, - }, - }, + Topic: structs.TopicACLToken, + Type: structs.TypeACLTokenUpserted, + Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}), }, }, { @@ -329,13 +305,9 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) { }, }, policyEvent: structs.Event{ - Topic: structs.TopicACLToken, - Type: structs.TypeACLTokenUpserted, - Payload: structs.ACLTokenEvent{ - ACLToken: &structs.ACLToken{ - SecretID: secretID, - }, - }, + Topic: structs.TopicACLToken, + Type: structs.TypeACLTokenUpserted, + Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}), }, }, { @@ -353,13 +325,9 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) { }, }, policyEvent: structs.Event{ - Topic: structs.TopicACLToken, - Type: structs.TypeACLTokenUpserted, - Payload: structs.ACLTokenEvent{ - ACLToken: &structs.ACLToken{ - SecretID: secretID, - }, - }, + Topic: structs.TopicACLToken, + Type: structs.TypeACLTokenUpserted, + Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}), }, }, { @@ -377,13 +345,9 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) { }, }, policyEvent: structs.Event{ - Topic: structs.TopicACLToken, - Type: structs.TypeACLTokenUpserted, - Payload: structs.ACLTokenEvent{ - ACLToken: &structs.ACLToken{ - SecretID: secretID, - }, - }, + Topic: structs.TopicACLToken, + Type: structs.TypeACLTokenUpserted, + Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}), }, }, { @@ -400,13 +364,9 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) { }, }, policyEvent: structs.Event{ - Topic: structs.TopicACLToken, - Type: structs.TypeACLTokenUpserted, - Payload: structs.ACLTokenEvent{ - ACLToken: &structs.ACLToken{ - SecretID: secretID, - }, - }, + Topic: structs.TopicACLToken, + Type: structs.TypeACLTokenUpserted, + Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}), }, }, { @@ -426,7 +386,7 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) { policyEvent: structs.Event{ Topic: structs.TopicACLPolicy, Type: structs.TypeACLPolicyUpserted, - Payload: structs.ACLPolicyEvent{ + Payload: &structs.ACLPolicyEvent{ ACLPolicy: &structs.ACLPolicy{ Name: "some-policy", }, @@ -450,7 +410,7 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) { policyEvent: structs.Event{ Topic: structs.TopicACLPolicy, Type: structs.TypeACLPolicyUpserted, - Payload: structs.ACLPolicyEvent{ + Payload: &structs.ACLPolicyEvent{ ACLPolicy: &structs.ACLPolicy{ Name: "some-policy", }, @@ -474,7 +434,7 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) { policyEvent: structs.Event{ Topic: structs.TopicACLPolicy, Type: structs.TypeACLPolicyDeleted, - Payload: structs.ACLPolicyEvent{ + Payload: &structs.ACLPolicyEvent{ ACLPolicy: &structs.ACLPolicy{ Name: "some-policy", }, diff --git a/nomad/structs/event.go b/nomad/structs/event.go index b18ed388bfa..79cf0bebc37 100644 --- a/nomad/structs/event.go +++ b/nomad/structs/event.go @@ -120,6 +120,23 @@ type NodeStreamEvent struct { type ACLTokenEvent struct { ACLToken *ACLToken + secretID string +} + +// NewACLTokenEvent takes a token and creates a new ACLTokenEvent. It creates +// a copy of the passed in ACLToken and empties out the copied tokens SecretID +func NewACLTokenEvent(token *ACLToken) *ACLTokenEvent { + c := token.Copy() + c.SecretID = "" + + return &ACLTokenEvent{ + ACLToken: c, + secretID: token.SecretID, + } +} + +func (a *ACLTokenEvent) SecretID() string { + return a.secretID } type ACLPolicyEvent struct { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 74dd1304a07..9cbfe22b8b8 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -10813,6 +10813,18 @@ type ACLToken struct { ModifyIndex uint64 } +func (a *ACLToken) Copy() *ACLToken { + c := new(ACLToken) + *c = *a + + c.Policies = make([]string, len(a.Policies)) + copy(c.Policies, a.Policies) + c.Hash = make([]byte, len(a.Hash)) + copy(c.Hash, a.Hash) + + return c +} + var ( // AnonymousACLToken is used no SecretID is provided, and the // request is made anonymously. From ab5947365a3aab343b5be6ad6b83dfd27577da27 Mon Sep 17 00:00:00 2001 From: Drew Bailey <2614075+drewbailey@users.noreply.github.com> Date: Thu, 10 Dec 2020 16:36:48 -0500 Subject: [PATCH 3/6] increase time --- nomad/fsm_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 78ce17307cd..f8de5740458 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -3405,9 +3405,10 @@ func TestFSM_ACLEvents_ACLToken(t *testing.T) { var events []structs.Event for { - deadline := time.Duration(testutil.TestMultiplier()*100) * time.Millisecond + deadline := time.Duration(testutil.TestMultiplier()*200) * time.Millisecond ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(deadline)) defer cancel() + out, err := sub.Next(ctx) if len(out.Events) == 0 { break From 306d46cf70015ec394cc1f29b968d78bd0357959 Mon Sep 17 00:00:00 2001 From: Drew Bailey Date: Thu, 10 Dec 2020 18:28:16 -0500 Subject: [PATCH 4/6] ignore empty tokens --- nomad/fsm_test.go | 30 ++++++++++-------------------- nomad/stream/event_broker.go | 7 ++++++- 2 files changed, 16 insertions(+), 21 deletions(-) diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index f8de5740458..63baafeab1b 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -2,7 +2,6 @@ package nomad import ( "bytes" - "context" "fmt" "reflect" "strings" @@ -3404,30 +3403,21 @@ func TestFSM_ACLEvents_ACLToken(t *testing.T) { require.NoError(t, err) var events []structs.Event - for { - deadline := time.Duration(testutil.TestMultiplier()*200) * time.Millisecond - ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(deadline)) - defer cancel() - - out, err := sub.Next(ctx) - if len(out.Events) == 0 { - break - } - // consume the queue until the deadline has exceeded or until we've - // received more events than expected - if err == context.DeadlineExceeded { - break - } + testutil.WaitForResult(func() (bool, error) { + out, err := sub.NextNoBlock() require.NoError(t, err) - events = append(events, out.Events...) - - if len(events) >= 1 { - break + if out == nil { + return false, fmt.Errorf("expected events got nil") } - } + events = out + return true, nil + }, func(err error) { + require.Fail(t, err.Error()) + }) + tc.eventfn(t, events) }) } diff --git a/nomad/stream/event_broker.go b/nomad/stream/event_broker.go index 7d19728eba2..b460f025ede 100644 --- a/nomad/stream/event_broker.go +++ b/nomad/stream/event_broker.go @@ -239,10 +239,15 @@ func (e *EventBroker) checkSubscriptionsAgainstPolicyChange() { aclSnapshot := e.aclDelegate.TokenProvider() for tokenSecretID := range e.subscriptions.byToken { + // if tokenSecretID is empty ACLs were disabled at time of subscribing + if tokenSecretID == "" { + continue + } + aclObj, err := aclObjFromSnapshotForTokenSecretID(aclSnapshot, e.aclCache, tokenSecretID) if err != nil || aclObj == nil { e.logger.Error("failed resolving ACL for secretID, closing subscriptions", "error", err) - e.subscriptions.closeSubscriptionsForTokens([]string{tokenSecretID}) + // e.subscriptions.closeSubscriptionsForTokens([]string{tokenSecretID}) continue } From eb532b8a032b8554b129bc4d2c55183bce954ee3 Mon Sep 17 00:00:00 2001 From: Drew Bailey Date: Fri, 11 Dec 2020 10:21:21 -0500 Subject: [PATCH 5/6] uncomment line --- nomad/fsm_test.go | 2 +- nomad/stream/event_broker.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 63baafeab1b..20ecf18f32d 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -3278,7 +3278,7 @@ func TestFSM_SnapshotRestore_Namespaces(t *testing.T) { } } -func TestFSM_ACLEvents_ACLToken(t *testing.T) { +func TestFSM_ACLEvents(t *testing.T) { t.Parallel() cases := []struct { diff --git a/nomad/stream/event_broker.go b/nomad/stream/event_broker.go index b460f025ede..7633ef348b0 100644 --- a/nomad/stream/event_broker.go +++ b/nomad/stream/event_broker.go @@ -246,8 +246,8 @@ func (e *EventBroker) checkSubscriptionsAgainstPolicyChange() { aclObj, err := aclObjFromSnapshotForTokenSecretID(aclSnapshot, e.aclCache, tokenSecretID) if err != nil || aclObj == nil { - e.logger.Error("failed resolving ACL for secretID, closing subscriptions", "error", err) - // e.subscriptions.closeSubscriptionsForTokens([]string{tokenSecretID}) + e.logger.Debug("failed resolving ACL for secretID, closing subscriptions", "error", err) + e.subscriptions.closeSubscriptionsForTokens([]string{tokenSecretID}) continue } From ab4e69865be921b2ebd3300a95aeb48ceb237766 Mon Sep 17 00:00:00 2001 From: Drew Bailey Date: Fri, 11 Dec 2020 10:25:00 -0500 Subject: [PATCH 6/6] changelog --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0b557184768..20cb1f38193 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,9 @@ IMPROVEMENTS: * consul/connect: interpolate the connect, service meta, and service canary meta blocks with the task environment [[GH-9586](https://github.com/hashicorp/nomad/pull/9586)] +BUG FIXES: + * core: Fixed a bug where ACLToken and ACLPolicy changes were ignored by the event stream [[GH-9595](https://github.com/hashicorp/nomad/issues/9595)] + ## 1.0.0 (December 8, 2020) FEATURES: