Skip to content

Commit

Permalink
vault: expired tokens count toward batch limit
Browse files Browse the repository at this point in the history
As of 0.11.3 Vault token revocation and purging was done in batches.
However the batch size was only limited by the number of *non-expired*
tokens being revoked.

Due to bugs prior to 0.11.3, *expired* tokens were not properly purged.
Long-lived clusters could have thousands to *millions* of very old
expired tokens that never got purged from the state store.

Since these expired tokens did not count against the batch limit, very
large batches could be created and overwhelm servers.

This commit ensures expired tokens count toward the batch limit with
this one line change:

```
- if len(revoking) >= toRevoke {
+ if len(revoking)+len(ttlExpired) >= toRevoke {
```

However, this code was difficult to test due to being in a periodically
executing loop. Most of the changes are to make this one line change
testable and test it.
  • Loading branch information
schmichael committed Jul 28, 2020
1 parent 9d190a6 commit 4a14604
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 13 deletions.
36 changes: 24 additions & 12 deletions nomad/vault.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,10 @@ type vaultClient struct {
// setConfigLock serializes access to the SetConfig method
setConfigLock sync.Mutex

// consts as struct fields for overriding in tests
maxRevokeBatchSize int
revocationIntv time.Duration

entHandler taskClientHandler
}

Expand All @@ -267,13 +271,15 @@ func NewVaultClient(c *config.VaultConfig, logger log.Logger, purgeFn PurgeVault
}

v := &vaultClient{
config: c,
logger: logger.Named("vault"),
limiter: rate.NewLimiter(requestRateLimit, int(requestRateLimit)),
revoking: make(map[*structs.VaultAccessor]time.Time),
purgeFn: purgeFn,
tomb: &tomb.Tomb{},
entHandler: delegate,
config: c,
logger: logger.Named("vault"),
limiter: rate.NewLimiter(requestRateLimit, int(requestRateLimit)),
revoking: make(map[*structs.VaultAccessor]time.Time),
purgeFn: purgeFn,
tomb: &tomb.Tomb{},
maxRevokeBatchSize: maxVaultRevokeBatchSize,
revocationIntv: vaultRevocationIntv,
entHandler: delegate,
}

if v.config.IsEnabled() {
Expand Down Expand Up @@ -1259,19 +1265,22 @@ func (v *vaultClient) parallelRevoke(ctx context.Context, accessors []*structs.V
}

// maxVaultRevokeBatchSize is the maximum tokens a revokeDaemon should revoke
// at any given time.
// and purge at any given time.
//
// Limiting the revocation batch size is beneficial for few reasons:
// * A single revocation failure of any entry in batch result into retrying the whole batch;
// the larger the batch is the higher likelihood of such failure
// * Smaller batch sizes result into more co-operativeness: provides hooks for
// reconsidering token TTL and leadership steps down.
// * Batches limit the size of the Raft message purging tokens. Due to bugs
// pre-0.11.3, expired tokens were not properly purged, so users upgrading from
// older versions may have huge numbers (millions) of expired tokens to purge.
const maxVaultRevokeBatchSize = 1000

// revokeDaemon should be called in a goroutine and is used to periodically
// revoke Vault accessors that failed the original revocation
func (v *vaultClient) revokeDaemon() {
ticker := time.NewTicker(vaultRevocationIntv)
ticker := time.NewTicker(v.revocationIntv)
defer ticker.Stop()

for {
Expand All @@ -1293,8 +1302,8 @@ func (v *vaultClient) revokeDaemon() {

// Build the list of accessors that need to be revoked while pruning any TTL'd checks
toRevoke := len(v.revoking)
if toRevoke > maxVaultRevokeBatchSize {
toRevoke = maxVaultRevokeBatchSize
if toRevoke > v.maxRevokeBatchSize {
toRevoke = v.maxRevokeBatchSize
}
revoking := make([]*structs.VaultAccessor, 0, toRevoke)
ttlExpired := []*structs.VaultAccessor{}
Expand All @@ -1305,7 +1314,10 @@ func (v *vaultClient) revokeDaemon() {
revoking = append(revoking, va)
}

if len(revoking) >= toRevoke {
// Batches should consider tokens to be revoked
// as well as expired tokens to ensure the Raft
// message is reasonably sized.
if len(revoking)+len(ttlExpired) >= toRevoke {
break
}
}
Expand Down
78 changes: 77 additions & 1 deletion nomad/vault_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"math/rand"
"reflect"
"strings"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -1436,7 +1437,7 @@ func TestVaultClient_RevokeTokens_PreEstablishs(t *testing.T) {
}
}

// TestVaultClient_RevokeTokens_failures_TTL asserts that
// TestVaultClient_RevokeTokens_Failures_TTL asserts that
// the registered TTL doesn't get extended on retries
func TestVaultClient_RevokeTokens_Failures_TTL(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -1694,6 +1695,81 @@ func TestVaultClient_RevokeTokens_Idempotent(t *testing.T) {
require.Errorf(t, err, "failed to purge token: %v", s)
}

// TestVaultClient_RevokeDaemon_Bounded asserts that token revocation
// batches are bounded in size.
func TestVaultClient_RevokeDaemon_Bounded(t *testing.T) {
t.Parallel()
v := testutil.NewTestVault(t)
defer v.Stop()

// Set the configs token in a new test role
v.Config.Token = defaultTestVaultWhitelistRoleAndToken(v, t, 5)

// Disable client until we can change settings for testing
conf := v.Config.Copy()
conf.Enabled = helper.BoolToPtr(false)

const (
batchSize = 100
batches = 3
)
resultCh := make(chan error, batches)
var totalPurges int64

// Purge function asserts batches are always < batchSize
purge := func(vas []*structs.VaultAccessor) error {
if len(vas) > batchSize {
resultCh <- fmt.Errorf("too many Vault accessors in batch: %d > %d", len(vas), batchSize)
} else {
resultCh <- nil
}
atomic.AddInt64(&totalPurges, int64(len(vas)))

return nil
}

logger := testlog.HCLogger(t)
client, err := NewVaultClient(conf, logger, purge, nil)
require.NoError(t, err)

// Override settings for testing and then enable client
client.maxRevokeBatchSize = batchSize
client.revocationIntv = 3 * time.Millisecond
conf = v.Config.Copy()
conf.Enabled = helper.BoolToPtr(true)
require.NoError(t, client.SetConfig(conf))

client.SetActive(true)
defer client.Stop()

waitForConnection(client, t)

// Create more tokens in Nomad than can fit in a batch; they don't need
// to exist in Vault.
accessors := make([]*structs.VaultAccessor, batchSize*batches)
for i := 0; i < len(accessors); i++ {
accessors[i] = &structs.VaultAccessor{Accessor: "abcd"}
}

// Mark for revocation
require.NoError(t, client.MarkForRevocation(accessors))

// Wait for tokens to be revoked
for i := 0; i < batches; i++ {
select {
case err := <-resultCh:
require.NoError(t, err)
case <-time.After(10 * time.Second):
// 10 seconds should be plenty long to process 3
// batches at a 3ms tick interval!
t.Errorf("timed out processing %d batches. %d/%d complete in 10s",
batches, i, batches)
}
}

require.Equal(t, int64(len(accessors)), atomic.LoadInt64(&totalPurges))
}

func waitForConnection(v *vaultClient, t *testing.T) {
testutil.WaitForResult(func() (bool, error) {
return v.ConnectionEstablished()
Expand Down

0 comments on commit 4a14604

Please sign in to comment.