Skip to content

Commit

Permalink
Merge pull request #8553 from hashicorp/b-vault-revoke-batch
Browse files Browse the repository at this point in the history
vault: expired tokens count toward batch limit
  • Loading branch information
schmichael authored Jul 29, 2020
2 parents 9d190a6 + 4a14604 commit 7eee26a
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 13 deletions.
36 changes: 24 additions & 12 deletions nomad/vault.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,10 @@ type vaultClient struct {
// setConfigLock serializes access to the SetConfig method
setConfigLock sync.Mutex

// consts as struct fields for overriding in tests
maxRevokeBatchSize int
revocationIntv time.Duration

entHandler taskClientHandler
}

Expand All @@ -267,13 +271,15 @@ func NewVaultClient(c *config.VaultConfig, logger log.Logger, purgeFn PurgeVault
}

v := &vaultClient{
config: c,
logger: logger.Named("vault"),
limiter: rate.NewLimiter(requestRateLimit, int(requestRateLimit)),
revoking: make(map[*structs.VaultAccessor]time.Time),
purgeFn: purgeFn,
tomb: &tomb.Tomb{},
entHandler: delegate,
config: c,
logger: logger.Named("vault"),
limiter: rate.NewLimiter(requestRateLimit, int(requestRateLimit)),
revoking: make(map[*structs.VaultAccessor]time.Time),
purgeFn: purgeFn,
tomb: &tomb.Tomb{},
maxRevokeBatchSize: maxVaultRevokeBatchSize,
revocationIntv: vaultRevocationIntv,
entHandler: delegate,
}

if v.config.IsEnabled() {
Expand Down Expand Up @@ -1259,19 +1265,22 @@ func (v *vaultClient) parallelRevoke(ctx context.Context, accessors []*structs.V
}

// maxVaultRevokeBatchSize is the maximum tokens a revokeDaemon should revoke
// at any given time.
// and purge at any given time.
//
// Limiting the revocation batch size is beneficial for few reasons:
// * A single revocation failure of any entry in batch result into retrying the whole batch;
// the larger the batch is the higher likelihood of such failure
// * Smaller batch sizes result into more co-operativeness: provides hooks for
// reconsidering token TTL and leadership steps down.
// * Batches limit the size of the Raft message purging tokens. Due to bugs
// pre-0.11.3, expired tokens were not properly purged, so users upgrading from
// older versions may have huge numbers (millions) of expired tokens to purge.
const maxVaultRevokeBatchSize = 1000

// revokeDaemon should be called in a goroutine and is used to periodically
// revoke Vault accessors that failed the original revocation
func (v *vaultClient) revokeDaemon() {
ticker := time.NewTicker(vaultRevocationIntv)
ticker := time.NewTicker(v.revocationIntv)
defer ticker.Stop()

for {
Expand All @@ -1293,8 +1302,8 @@ func (v *vaultClient) revokeDaemon() {

// Build the list of accessors that need to be revoked while pruning any TTL'd checks
toRevoke := len(v.revoking)
if toRevoke > maxVaultRevokeBatchSize {
toRevoke = maxVaultRevokeBatchSize
if toRevoke > v.maxRevokeBatchSize {
toRevoke = v.maxRevokeBatchSize
}
revoking := make([]*structs.VaultAccessor, 0, toRevoke)
ttlExpired := []*structs.VaultAccessor{}
Expand All @@ -1305,7 +1314,10 @@ func (v *vaultClient) revokeDaemon() {
revoking = append(revoking, va)
}

if len(revoking) >= toRevoke {
// Batches should consider tokens to be revoked
// as well as expired tokens to ensure the Raft
// message is reasonably sized.
if len(revoking)+len(ttlExpired) >= toRevoke {
break
}
}
Expand Down
78 changes: 77 additions & 1 deletion nomad/vault_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"math/rand"
"reflect"
"strings"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -1436,7 +1437,7 @@ func TestVaultClient_RevokeTokens_PreEstablishs(t *testing.T) {
}
}

// TestVaultClient_RevokeTokens_failures_TTL asserts that
// TestVaultClient_RevokeTokens_Failures_TTL asserts that
// the registered TTL doesn't get extended on retries
func TestVaultClient_RevokeTokens_Failures_TTL(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -1694,6 +1695,81 @@ func TestVaultClient_RevokeTokens_Idempotent(t *testing.T) {
require.Errorf(t, err, "failed to purge token: %v", s)
}

// TestVaultClient_RevokeDaemon_Bounded asserts that token revocation
// batches are bounded in size.
func TestVaultClient_RevokeDaemon_Bounded(t *testing.T) {
t.Parallel()
v := testutil.NewTestVault(t)
defer v.Stop()

// Set the configs token in a new test role
v.Config.Token = defaultTestVaultWhitelistRoleAndToken(v, t, 5)

// Disable client until we can change settings for testing
conf := v.Config.Copy()
conf.Enabled = helper.BoolToPtr(false)

const (
batchSize = 100
batches = 3
)
resultCh := make(chan error, batches)
var totalPurges int64

// Purge function asserts batches are always < batchSize
purge := func(vas []*structs.VaultAccessor) error {
if len(vas) > batchSize {
resultCh <- fmt.Errorf("too many Vault accessors in batch: %d > %d", len(vas), batchSize)
} else {
resultCh <- nil
}
atomic.AddInt64(&totalPurges, int64(len(vas)))

return nil
}

logger := testlog.HCLogger(t)
client, err := NewVaultClient(conf, logger, purge, nil)
require.NoError(t, err)

// Override settings for testing and then enable client
client.maxRevokeBatchSize = batchSize
client.revocationIntv = 3 * time.Millisecond
conf = v.Config.Copy()
conf.Enabled = helper.BoolToPtr(true)
require.NoError(t, client.SetConfig(conf))

client.SetActive(true)
defer client.Stop()

waitForConnection(client, t)

// Create more tokens in Nomad than can fit in a batch; they don't need
// to exist in Vault.
accessors := make([]*structs.VaultAccessor, batchSize*batches)
for i := 0; i < len(accessors); i++ {
accessors[i] = &structs.VaultAccessor{Accessor: "abcd"}
}

// Mark for revocation
require.NoError(t, client.MarkForRevocation(accessors))

// Wait for tokens to be revoked
for i := 0; i < batches; i++ {
select {
case err := <-resultCh:
require.NoError(t, err)
case <-time.After(10 * time.Second):
// 10 seconds should be plenty long to process 3
// batches at a 3ms tick interval!
t.Errorf("timed out processing %d batches. %d/%d complete in 10s",
batches, i, batches)
}
}

require.Equal(t, int64(len(accessors)), atomic.LoadInt64(&totalPurges))
}

func waitForConnection(v *vaultClient, t *testing.T) {
testutil.WaitForResult(func() (bool, error) {
return v.ConnectionEstablished()
Expand Down

0 comments on commit 7eee26a

Please sign in to comment.