From 27f3a819ad9d6c6d62735d7bddd508b4368cfcec Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Thu, 21 May 2020 07:38:27 -0400 Subject: [PATCH 1/8] on leadership establishment, revoke Vault tokens in background Establishing leadership should be very fast and never make external API calls. This fixes a situation where there is a long backlog of Vault tokens to be revoked on when leadership is gained. In such case, revoking the tokens will significantly slow down leadership establishment and slow down processing. Worse, the revocation call does not honor leadership `stopCh` signals, so it will not stop when the leader loses leadership. --- nomad/leader.go | 4 +++- nomad/vault.go | 16 ++++++++++++++++ nomad/vault_test.go | 27 +++++++++++++++++++++++++++ nomad/vault_testing.go | 5 +++++ 4 files changed, 51 insertions(+), 1 deletion(-) diff --git a/nomad/leader.go b/nomad/leader.go index 18c36ef386e..a246d4f565e 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -390,7 +390,9 @@ func (s *Server) revokeVaultAccessorsOnRestore() error { } if len(revoke) != 0 { - if err := s.vault.RevokeTokens(context.Background(), revoke, true); err != nil { + s.logger.Info("revoking vault accessors on restore", "accessors", len(revoke)) + + if err := s.vault.MarkForRevocation(revoke); err != nil { return fmt.Errorf("failed to revoke tokens: %v", err) } } diff --git a/nomad/vault.go b/nomad/vault.go index 9dbcd066008..71263cbbefb 100644 --- a/nomad/vault.go +++ b/nomad/vault.go @@ -121,6 +121,9 @@ type VaultClient interface { // RevokeTokens takes a set of tokens accessor and revokes the tokens RevokeTokens(ctx context.Context, accessors []*structs.VaultAccessor, committed bool) error + // MarkForRevocation revokes the tokens in background + MarkForRevocation(accessors []*structs.VaultAccessor) error + // Stop is used to stop token renewal Stop() @@ -1128,6 +1131,19 @@ func (v *vaultClient) RevokeTokens(ctx context.Context, accessors []*structs.Vau return nil } +func (v *vaultClient) MarkForRevocation(accessors []*structs.VaultAccessor) error { + if !v.Enabled() { + return nil + } + + if !v.Active() { + return fmt.Errorf("Vault client not active") + } + + v.storeForRevocation(accessors) + return nil +} + // storeForRevocation stores the passed set of accessors for revocation. It // captures their effective TTL by storing their create TTL plus the current // time. diff --git a/nomad/vault_test.go b/nomad/vault_test.go index 99d7c78e40b..3f94d79446e 100644 --- a/nomad/vault_test.go +++ b/nomad/vault_test.go @@ -1365,6 +1365,33 @@ func TestVaultClient_CreateToken_Prestart(t *testing.T) { } } +func TestVaultClient_MarkForRevocation(t *testing.T) { + vconfig := &config.VaultConfig{ + Enabled: helper.BoolToPtr(true), + Token: uuid.Generate(), + Addr: "http://127.0.0.1:0", + } + logger := testlog.HCLogger(t) + client, err := NewVaultClient(vconfig, logger, nil) + require.NoError(t, err) + + client.SetActive(true) + defer client.Stop() + + // Create some VaultAccessors + vas := []*structs.VaultAccessor{ + mock.VaultAccessor(), + mock.VaultAccessor(), + } + + err = client.MarkForRevocation(vas) + require.NoError(t, err) + + // Wasn't committed + require.Len(t, client.revoking, 2) + require.Equal(t, 2, client.stats().TrackedForRevoke) + +} func TestVaultClient_RevokeTokens_PreEstablishs(t *testing.T) { t.Parallel() vconfig := &config.VaultConfig{ diff --git a/nomad/vault_testing.go b/nomad/vault_testing.go index ba0b1d92394..e3555173701 100644 --- a/nomad/vault_testing.go +++ b/nomad/vault_testing.go @@ -135,6 +135,11 @@ func (v *TestVaultClient) RevokeTokens(ctx context.Context, accessors []*structs return nil } +func (v *TestVaultClient) MarkForRevocation(accessors []*structs.VaultAccessor) error { + v.RevokedTokens = append(v.RevokedTokens, accessors...) + return nil +} + func (v *TestVaultClient) Stop() {} func (v *TestVaultClient) SetActive(enabled bool) {} func (v *TestVaultClient) SetConfig(config *config.VaultConfig) error { return nil } From 506e441a4dab1314e43f09bbdecd07c12c23b3c5 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Thu, 21 May 2020 07:55:36 -0400 Subject: [PATCH 2/8] rate limit revokeDaemon --- nomad/vault.go | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/nomad/vault.go b/nomad/vault.go index 71263cbbefb..a008885add9 100644 --- a/nomad/vault.go +++ b/nomad/vault.go @@ -1223,6 +1223,16 @@ func (v *vaultClient) parallelRevoke(ctx context.Context, accessors []*structs.V return g.Wait() } +// maxVaultRevokeBatchSize is the maximum tokens a revokeDaemon should revoke +// at any given time. +// +// Limiting the revocation batch size is beneficial for few reasons: +// * A single revocation failure of any entry in batch result into retrying the whole batch; +// the larger the batch is the higher likelihood of such failure +// * Smaller batch sizes result into more co-operativeness: provides hooks for +// reconsidering token TTL and leadership steps down. +const maxVaultRevokeBatchSize = 1000 + // revokeDaemon should be called in a goroutine and is used to periodically // revoke Vault accessors that failed the original revocation func (v *vaultClient) revokeDaemon() { @@ -1247,13 +1257,21 @@ func (v *vaultClient) revokeDaemon() { } // Build the list of accessors that need to be revoked while pruning any TTL'd checks - revoking := make([]*structs.VaultAccessor, 0, len(v.revoking)) + toRevoke := len(v.revoking) + if toRevoke > maxVaultRevokeBatchSize { + toRevoke = maxVaultRevokeBatchSize + } + revoking := make([]*structs.VaultAccessor, 0, toRevoke) for va, ttl := range v.revoking { if now.After(ttl) { delete(v.revoking, va) } else { revoking = append(revoking, va) } + + if len(revoking) >= toRevoke { + break + } } if err := v.parallelRevoke(context.Background(), revoking); err != nil { From 39fd3b95dda86d5c932f903e822af3bd2edeabe9 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Thu, 21 May 2020 08:18:12 -0400 Subject: [PATCH 3/8] apply the same change to consul revocation --- nomad/consul.go | 17 ++++++++++++++++- nomad/consul_test.go | 33 +++++++++++++++++++++++++++++++++ nomad/leader.go | 4 ++-- 3 files changed, 51 insertions(+), 3 deletions(-) diff --git a/nomad/consul.go b/nomad/consul.go index 14511ad4bad..b83ecbbc94e 100644 --- a/nomad/consul.go +++ b/nomad/consul.go @@ -95,6 +95,9 @@ type ConsulACLsAPI interface { // RevokeTokens instructs Consul to revoke the given token accessors. RevokeTokens(context.Context, []*structs.SITokenAccessor, bool) bool + // MarkForRevocation marks the tokens for background revocation + MarkForRevocation([]*structs.SITokenAccessor) + // Stop is used to stop background token revocations. Intended to be used // on Nomad Server shutdown. Stop() @@ -285,6 +288,10 @@ func (c *consulACLsAPI) RevokeTokens(ctx context.Context, accessors []*structs.S return false } +func (c *consulACLsAPI) MarkForRevocation(accessors []*structs.SITokenAccessor) { + c.storeForRevocation(accessors) +} + func (c *consulACLsAPI) storeForRevocation(accessors []*structs.SITokenAccessor) { c.bgRevokeLock.Lock() defer c.bgRevokeLock.Unlock() @@ -369,6 +376,10 @@ func (c *consulACLsAPI) bgRetryRevokeDaemon() { } } +// maxConsulRevocationBatchSize is the maximum tokens a bgRetryRevoke should revoke +// at any given time. +const maxConsulRevocationBatchSize = 1000 + func (c *consulACLsAPI) bgRetryRevoke() { c.bgRevokeLock.Lock() defer c.bgRevokeLock.Unlock() @@ -380,7 +391,11 @@ func (c *consulACLsAPI) bgRetryRevoke() { // unlike vault tokens, SI tokens do not have a TTL, and so we must try to // remove all SI token accessors, every time, until they're gone - toPurge := make([]*structs.SITokenAccessor, len(c.bgRetryRevocation), len(c.bgRetryRevocation)) + toRevoke := len(c.bgRetryRevocation) + if toRevoke > maxConsulRevocationBatchSize { + toRevoke = maxConsulRevocationBatchSize + } + toPurge := make([]*structs.SITokenAccessor, toRevoke) copy(toPurge, c.bgRetryRevocation) if err := c.parallelRevoke(context.Background(), toPurge); err != nil { diff --git a/nomad/consul_test.go b/nomad/consul_test.go index 051c45e3e09..333cd2e3819 100644 --- a/nomad/consul_test.go +++ b/nomad/consul_test.go @@ -65,6 +65,14 @@ func (mps *mockPurgingServer) purgeFunc(accessors []*structs.SITokenAccessor) er } func (m *mockConsulACLsAPI) RevokeTokens(_ context.Context, accessors []*structs.SITokenAccessor, committed bool) bool { + return m.storeForRevocation(accessors, committed) +} + +func (m *mockConsulACLsAPI) MarkForRevocation(accessors []*structs.SITokenAccessor) { + m.storeForRevocation(accessors, true) +} + +func (m *mockConsulACLsAPI) storeForRevocation(accessors []*structs.SITokenAccessor, committed bool) bool { m.lock.Lock() defer m.lock.Unlock() @@ -168,6 +176,31 @@ func TestConsulACLsAPI_RevokeTokens(t *testing.T) { }) } +func TestConsulACLsAPI_MarkForRevocation(t *testing.T) { + t.Parallel() + + logger := testlog.HCLogger(t) + aclAPI := consul.NewMockACLsAPI(logger) + + c := NewConsulACLsAPI(aclAPI, logger, nil) + + generated, err := c.CreateToken(context.Background(), ServiceIdentityRequest{ + ClusterID: uuid.Generate(), + AllocID: uuid.Generate(), + TaskName: "task1-sidecar-proxy", + TaskKind: structs.NewTaskKind(structs.ConnectProxyPrefix, "service1"), + }) + require.NoError(t, err) + + // set the mock error after calling CreateToken for setting up + aclAPI.SetError(nil) + + accessors := []*structs.SITokenAccessor{{AccessorID: generated.AccessorID}} + c.MarkForRevocation(accessors) + require.Len(t, c.bgRetryRevocation, 1) + require.Contains(t, c.bgRetryRevocation, accessors[0]) +} + func TestConsulACLsAPI_bgRetryRevoke(t *testing.T) { t.Parallel() diff --git a/nomad/leader.go b/nomad/leader.go index a246d4f565e..e849dd635ba 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -438,8 +438,8 @@ func (s *Server) revokeSITokenAccessorsOnRestore() error { } if len(toRevoke) > 0 { - ctx := context.Background() - s.consulACLs.RevokeTokens(ctx, toRevoke, true) + s.logger.Info("revoking consul accessors on restore", "accessors", len(toRevoke)) + s.consulACLs.MarkForRevocation(toRevoke) } return nil From adaeb1091b46ae62dea88fc8edce1307e94ce7d0 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Thu, 21 May 2020 08:27:17 -0400 Subject: [PATCH 4/8] Reorder leadership handling Start serving RPC immediately after leader components are enabled, and move clean up to the bottom as they don't block leadership responsibilities. --- nomad/leader.go | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/nomad/leader.go b/nomad/leader.go index e849dd635ba..8fc63f245fa 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -251,19 +251,16 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { // Activate the vault client s.vault.SetActive(true) - // Cleanup orphaned Vault token accessors - if err := s.revokeVaultAccessorsOnRestore(); err != nil { - return err - } - - // Cleanup orphaned Service Identity token accessors - if err := s.revokeSITokenAccessorsOnRestore(); err != nil { - return err - } // Enable the periodic dispatcher, since we are now the leader. s.periodicDispatcher.SetEnabled(true) + // Activate RPCs after all leadership components are enabled + // and the server can handle write RPCs + s.setConsistentReadReady() + + // Further clean ups and follow up that don't block RPC consistency + // Restore the periodic dispatcher state if err := s.restorePeriodicDispatcher(); err != nil { return err @@ -313,7 +310,15 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { return err } - s.setConsistentReadReady() + // Cleanup orphaned Vault token accessors + if err := s.revokeVaultAccessorsOnRestore(); err != nil { + return err + } + + // Cleanup orphaned Service Identity token accessors + if err := s.revokeSITokenAccessorsOnRestore(); err != nil { + return err + } return nil } From c7816081a546bc969005c2fc2cd8ec4e00097b1a Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Thu, 21 May 2020 19:54:50 -0400 Subject: [PATCH 5/8] vault: ensure ttl expired tokens are purge If a token is scheduled for revocation expires before we revoke it, ensure that it is marked as purged in raft and is only removed from local vault state if the purge operation succeeds. Prior to this change, we may remove the accessor from local state but not purge it from Raft. This causes unnecessary and churn in the next leadership elections (and until 0.11.2 result in indefinite retries). --- nomad/vault.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/nomad/vault.go b/nomad/vault.go index a008885add9..0b68bc807e2 100644 --- a/nomad/vault.go +++ b/nomad/vault.go @@ -1262,9 +1262,10 @@ func (v *vaultClient) revokeDaemon() { toRevoke = maxVaultRevokeBatchSize } revoking := make([]*structs.VaultAccessor, 0, toRevoke) + ttlExpired := []*structs.VaultAccessor{} for va, ttl := range v.revoking { if now.After(ttl) { - delete(v.revoking, va) + ttlExpired = append(ttlExpired, va) } else { revoking = append(revoking, va) } @@ -1283,6 +1284,10 @@ func (v *vaultClient) revokeDaemon() { // Unlock before a potentially expensive operation v.revLock.Unlock() + // purge all explicitly revoked as well as ttl expired tokens + // and only remove them locally on purge success + revoking = append(revoking, ttlExpired...) + // Call the passed in token revocation function if err := v.purgeFn(revoking); err != nil { // Can continue since revocation is idempotent From abda1a1c7adf25a8f3bef2b3862258a46b8c3fbf Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Thu, 21 May 2020 21:05:53 -0400 Subject: [PATCH 6/8] always set purgeFunc purgeFunc cannot be nil, so ensure it's set to a no-op function in tests. --- nomad/consul.go | 4 ++++ nomad/vault.go | 3 +++ 2 files changed, 7 insertions(+) diff --git a/nomad/consul.go b/nomad/consul.go index b83ecbbc94e..7036ee480a6 100644 --- a/nomad/consul.go +++ b/nomad/consul.go @@ -146,6 +146,10 @@ type consulACLsAPI struct { } func NewConsulACLsAPI(aclClient consul.ACLsAPI, logger hclog.Logger, purgeFunc PurgeSITokenAccessorFunc) *consulACLsAPI { + if purgeFunc == nil { + purgeFunc = func([]*structs.SITokenAccessor) error { return nil } + } + c := &consulACLsAPI{ aclClient: aclClient, limiter: rate.NewLimiter(siTokenRequestRateLimit, int(siTokenRequestRateLimit)), diff --git a/nomad/vault.go b/nomad/vault.go index 0b68bc807e2..1d943fc4d3a 100644 --- a/nomad/vault.go +++ b/nomad/vault.go @@ -252,6 +252,9 @@ func NewVaultClient(c *config.VaultConfig, logger log.Logger, purgeFn PurgeVault if logger == nil { return nil, fmt.Errorf("must pass valid logger") } + if purgeFn == nil { + purgeFn = func(accessors []*structs.VaultAccessor) error { return nil } + } v := &vaultClient{ config: c, From e9d14a3e19a7100c4a034cafa79984c7677abea4 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Sun, 31 May 2020 21:04:39 -0400 Subject: [PATCH 7/8] Apply suggestions from code review Co-authored-by: Michael Schurter --- nomad/leader.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nomad/leader.go b/nomad/leader.go index 8fc63f245fa..cbf8216c83b 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -395,7 +395,7 @@ func (s *Server) revokeVaultAccessorsOnRestore() error { } if len(revoke) != 0 { - s.logger.Info("revoking vault accessors on restore", "accessors", len(revoke)) + s.logger.Info("revoking vault accessors after becoming leader", "accessors", len(revoke)) if err := s.vault.MarkForRevocation(revoke); err != nil { return fmt.Errorf("failed to revoke tokens: %v", err) @@ -443,7 +443,7 @@ func (s *Server) revokeSITokenAccessorsOnRestore() error { } if len(toRevoke) > 0 { - s.logger.Info("revoking consul accessors on restore", "accessors", len(toRevoke)) + s.logger.Info("revoking consul accessors after becoming leader", "accessors", len(toRevoke)) s.consulACLs.MarkForRevocation(toRevoke) } From 127a960a548e64734b33b18c9368a10e15c06bff Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Sun, 31 May 2020 21:19:33 -0400 Subject: [PATCH 8/8] clarify rpc consistency readiness comment --- nomad/leader.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/nomad/leader.go b/nomad/leader.go index cbf8216c83b..26b81871fea 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -255,8 +255,9 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { // Enable the periodic dispatcher, since we are now the leader. s.periodicDispatcher.SetEnabled(true) - // Activate RPCs after all leadership components are enabled - // and the server can handle write RPCs + // Activate RPC now that local FSM caught up with Raft (as evident by Barrier call success) + // and all leader related components (e.g. broker queue) are enabled. + // Auxiliary processes (e.g. background, bookkeeping, and cleanup tasks can start after) s.setConsistentReadReady() // Further clean ups and follow up that don't block RPC consistency