Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up leadership establishment #8036

Merged
merged 8 commits into from
Jun 1, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion nomad/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ type ConsulACLsAPI interface {
// RevokeTokens instructs Consul to revoke the given token accessors.
RevokeTokens(context.Context, []*structs.SITokenAccessor, bool) bool

// MarkForRevocation marks the tokens for background revocation
MarkForRevocation([]*structs.SITokenAccessor)

// Stop is used to stop background token revocations. Intended to be used
// on Nomad Server shutdown.
Stop()
Expand Down Expand Up @@ -285,6 +288,10 @@ func (c *consulACLsAPI) RevokeTokens(ctx context.Context, accessors []*structs.S
return false
}

func (c *consulACLsAPI) MarkForRevocation(accessors []*structs.SITokenAccessor) {
c.storeForRevocation(accessors)
}

func (c *consulACLsAPI) storeForRevocation(accessors []*structs.SITokenAccessor) {
c.bgRevokeLock.Lock()
defer c.bgRevokeLock.Unlock()
Expand Down Expand Up @@ -369,6 +376,10 @@ func (c *consulACLsAPI) bgRetryRevokeDaemon() {
}
}

// maxConsulRevocationBatchSize is the maximum tokens a bgRetryRevoke should revoke
// at any given time.
const maxConsulRevocationBatchSize = 1000

func (c *consulACLsAPI) bgRetryRevoke() {
c.bgRevokeLock.Lock()
defer c.bgRevokeLock.Unlock()
Expand All @@ -380,7 +391,11 @@ func (c *consulACLsAPI) bgRetryRevoke() {

// unlike vault tokens, SI tokens do not have a TTL, and so we must try to
// remove all SI token accessors, every time, until they're gone
toPurge := make([]*structs.SITokenAccessor, len(c.bgRetryRevocation), len(c.bgRetryRevocation))
toRevoke := len(c.bgRetryRevocation)
if toRevoke > maxConsulRevocationBatchSize {
toRevoke = maxConsulRevocationBatchSize
}
toPurge := make([]*structs.SITokenAccessor, toRevoke)
copy(toPurge, c.bgRetryRevocation)

if err := c.parallelRevoke(context.Background(), toPurge); err != nil {
Expand Down
33 changes: 33 additions & 0 deletions nomad/consul_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ func (mps *mockPurgingServer) purgeFunc(accessors []*structs.SITokenAccessor) er
}

func (m *mockConsulACLsAPI) RevokeTokens(_ context.Context, accessors []*structs.SITokenAccessor, committed bool) bool {
return m.storeForRevocation(accessors, committed)
}

func (m *mockConsulACLsAPI) MarkForRevocation(accessors []*structs.SITokenAccessor) {
m.storeForRevocation(accessors, true)
}

func (m *mockConsulACLsAPI) storeForRevocation(accessors []*structs.SITokenAccessor, committed bool) bool {
m.lock.Lock()
defer m.lock.Unlock()

Expand Down Expand Up @@ -168,6 +176,31 @@ func TestConsulACLsAPI_RevokeTokens(t *testing.T) {
})
}

func TestConsulACLsAPI_MarkForRevocation(t *testing.T) {
t.Parallel()

logger := testlog.HCLogger(t)
aclAPI := consul.NewMockACLsAPI(logger)

c := NewConsulACLsAPI(aclAPI, logger, nil)

generated, err := c.CreateToken(context.Background(), ServiceIdentityRequest{
ClusterID: uuid.Generate(),
AllocID: uuid.Generate(),
TaskName: "task1-sidecar-proxy",
TaskKind: structs.NewTaskKind(structs.ConnectProxyPrefix, "service1"),
})
require.NoError(t, err)

// set the mock error after calling CreateToken for setting up
aclAPI.SetError(nil)

accessors := []*structs.SITokenAccessor{{AccessorID: generated.AccessorID}}
c.MarkForRevocation(accessors)
require.Len(t, c.bgRetryRevocation, 1)
require.Contains(t, c.bgRetryRevocation, accessors[0])
}

func TestConsulACLsAPI_bgRetryRevoke(t *testing.T) {
t.Parallel()

Expand Down
33 changes: 20 additions & 13 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,19 +251,16 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {

// Activate the vault client
s.vault.SetActive(true)
// Cleanup orphaned Vault token accessors
if err := s.revokeVaultAccessorsOnRestore(); err != nil {
return err
}

// Cleanup orphaned Service Identity token accessors
if err := s.revokeSITokenAccessorsOnRestore(); err != nil {
return err
}

// Enable the periodic dispatcher, since we are now the leader.
s.periodicDispatcher.SetEnabled(true)

// Activate RPCs after all leadership components are enabled
// and the server can handle write RPCs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment seems a bit off now that not all "leadership components" are enabled at this point (although I guess all of them that set the SetEnabled pattern are).

Is there a specific operation that makes the leader ready to serve consistent reads? If so let's specifically call that one out so that as we add/remove/move calls in this func we have a way to determine whether they need to come before or after this point.

s.setConsistentReadReady()

// Further clean ups and follow up that don't block RPC consistency

// Restore the periodic dispatcher state
if err := s.restorePeriodicDispatcher(); err != nil {
return err
Expand Down Expand Up @@ -313,7 +310,15 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
return err
}

s.setConsistentReadReady()
// Cleanup orphaned Vault token accessors
if err := s.revokeVaultAccessorsOnRestore(); err != nil {
return err
}

// Cleanup orphaned Service Identity token accessors
if err := s.revokeSITokenAccessorsOnRestore(); err != nil {
return err
}

return nil
}
Expand Down Expand Up @@ -390,7 +395,9 @@ func (s *Server) revokeVaultAccessorsOnRestore() error {
}

if len(revoke) != 0 {
if err := s.vault.RevokeTokens(context.Background(), revoke, true); err != nil {
s.logger.Info("revoking vault accessors on restore", "accessors", len(revoke))
notnoop marked this conversation as resolved.
Show resolved Hide resolved

if err := s.vault.MarkForRevocation(revoke); err != nil {
return fmt.Errorf("failed to revoke tokens: %v", err)
}
}
Expand Down Expand Up @@ -436,8 +443,8 @@ func (s *Server) revokeSITokenAccessorsOnRestore() error {
}

if len(toRevoke) > 0 {
ctx := context.Background()
s.consulACLs.RevokeTokens(ctx, toRevoke, true)
s.logger.Info("revoking consul accessors on restore", "accessors", len(toRevoke))
notnoop marked this conversation as resolved.
Show resolved Hide resolved
s.consulACLs.MarkForRevocation(toRevoke)
}

return nil
Expand Down
36 changes: 35 additions & 1 deletion nomad/vault.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ type VaultClient interface {
// RevokeTokens takes a set of tokens accessor and revokes the tokens
RevokeTokens(ctx context.Context, accessors []*structs.VaultAccessor, committed bool) error

// MarkForRevocation revokes the tokens in background
MarkForRevocation(accessors []*structs.VaultAccessor) error

// Stop is used to stop token renewal
Stop()

Expand Down Expand Up @@ -1128,6 +1131,19 @@ func (v *vaultClient) RevokeTokens(ctx context.Context, accessors []*structs.Vau
return nil
}

func (v *vaultClient) MarkForRevocation(accessors []*structs.VaultAccessor) error {
if !v.Enabled() {
return nil
}

if !v.Active() {
return fmt.Errorf("Vault client not active")
}

v.storeForRevocation(accessors)
return nil
}

// storeForRevocation stores the passed set of accessors for revocation. It
// captures their effective TTL by storing their create TTL plus the current
// time.
Expand Down Expand Up @@ -1207,6 +1223,16 @@ func (v *vaultClient) parallelRevoke(ctx context.Context, accessors []*structs.V
return g.Wait()
}

// maxVaultRevokeBatchSize is the maximum tokens a revokeDaemon should revoke
// at any given time.
//
// Limiting the revocation batch size is beneficial for few reasons:
// * A single revocation failure of any entry in batch result into retrying the whole batch;
// the larger the batch is the higher likelihood of such failure
// * Smaller batch sizes result into more co-operativeness: provides hooks for
// reconsidering token TTL and leadership steps down.
const maxVaultRevokeBatchSize = 1000

// revokeDaemon should be called in a goroutine and is used to periodically
// revoke Vault accessors that failed the original revocation
func (v *vaultClient) revokeDaemon() {
Expand All @@ -1231,13 +1257,21 @@ func (v *vaultClient) revokeDaemon() {
}

// Build the list of accessors that need to be revoked while pruning any TTL'd checks
revoking := make([]*structs.VaultAccessor, 0, len(v.revoking))
toRevoke := len(v.revoking)
if toRevoke > maxVaultRevokeBatchSize {
toRevoke = maxVaultRevokeBatchSize
}
revoking := make([]*structs.VaultAccessor, 0, toRevoke)
for va, ttl := range v.revoking {
if now.After(ttl) {
delete(v.revoking, va)
schmichael marked this conversation as resolved.
Show resolved Hide resolved
} else {
revoking = append(revoking, va)
}

if len(revoking) >= toRevoke {
break
}
}

if err := v.parallelRevoke(context.Background(), revoking); err != nil {
Expand Down
27 changes: 27 additions & 0 deletions nomad/vault_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1365,6 +1365,33 @@ func TestVaultClient_CreateToken_Prestart(t *testing.T) {
}
}

func TestVaultClient_MarkForRevocation(t *testing.T) {
vconfig := &config.VaultConfig{
Enabled: helper.BoolToPtr(true),
Token: uuid.Generate(),
Addr: "http://127.0.0.1:0",
}
logger := testlog.HCLogger(t)
client, err := NewVaultClient(vconfig, logger, nil)
require.NoError(t, err)

client.SetActive(true)
defer client.Stop()

// Create some VaultAccessors
vas := []*structs.VaultAccessor{
mock.VaultAccessor(),
mock.VaultAccessor(),
}

err = client.MarkForRevocation(vas)
require.NoError(t, err)

// Wasn't committed
require.Len(t, client.revoking, 2)
require.Equal(t, 2, client.stats().TrackedForRevoke)

}
func TestVaultClient_RevokeTokens_PreEstablishs(t *testing.T) {
t.Parallel()
vconfig := &config.VaultConfig{
Expand Down
5 changes: 5 additions & 0 deletions nomad/vault_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ func (v *TestVaultClient) RevokeTokens(ctx context.Context, accessors []*structs
return nil
}

func (v *TestVaultClient) MarkForRevocation(accessors []*structs.VaultAccessor) error {
v.RevokedTokens = append(v.RevokedTokens, accessors...)
return nil
}

func (v *TestVaultClient) Stop() {}
func (v *TestVaultClient) SetActive(enabled bool) {}
func (v *TestVaultClient) SetConfig(config *config.VaultConfig) error { return nil }
Expand Down