Skip to content

Commit

Permalink
Merge pull request #2318 from hashicorp/f-vault-debug
Browse files Browse the repository at this point in the history
Server side Vault telemetry
  • Loading branch information
dadgar authored Feb 17, 2017
2 parents 16a8a11 + 22b64a5 commit da0c005
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 14 deletions.
8 changes: 7 additions & 1 deletion client/vaultclient/vaultclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,13 @@ func (c *vaultClient) DeriveToken(alloc *structs.Allocation, taskNames []string)
// Use the token supplied to interact with vault
c.client.SetToken("")

return c.tokenDeriver(alloc, taskNames, c.client)
tokens, err := c.tokenDeriver(alloc, taskNames, c.client)
if err != nil {
c.logger.Printf("[ERR] client.vault: failed to derive token for allocation %q and tasks %v: %v", alloc.ID, taskNames, err)
return nil, err
}

return tokens, nil
}

// GetConsulACL creates a vault API client and reads from vault a consul ACL
Expand Down
8 changes: 4 additions & 4 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1061,7 +1061,7 @@ func (n *Node) DeriveVaultToken(args *structs.DeriveVaultTokenRequest,

secret, err := n.srv.vault.CreateToken(ctx, alloc, task)
if err != nil {
wrapped := fmt.Errorf("failed to create token for task %q: %v", task, err)
wrapped := fmt.Errorf("failed to create token for task %q on alloc %q: %v", task, alloc.ID, err)
if rerr, ok := err.(*structs.RecoverableError); ok && rerr.Recoverable {
// If the error is recoverable, propogate it
return structs.NewRecoverableError(wrapped, true)
Expand Down Expand Up @@ -1117,10 +1117,10 @@ func (n *Node) DeriveVaultToken(args *structs.DeriveVaultTokenRequest,

// If there was an error revoke the created tokens
if createErr != nil {
n.srv.logger.Printf("[ERR] nomad.node: Vault token creation failed: %v", createErr)
n.srv.logger.Printf("[ERR] nomad.node: Vault token creation for alloc %q failed: %v", alloc.ID, createErr)

if revokeErr := n.srv.vault.RevokeTokens(context.Background(), accessors, false); revokeErr != nil {
n.srv.logger.Printf("[ERR] nomad.node: Vault token revocation failed: %v", revokeErr)
n.srv.logger.Printf("[ERR] nomad.node: Vault token revocation for alloc %q failed: %v", alloc.ID, revokeErr)
}

if rerr, ok := createErr.(*structs.RecoverableError); ok {
Expand All @@ -1136,7 +1136,7 @@ func (n *Node) DeriveVaultToken(args *structs.DeriveVaultTokenRequest,
req := structs.VaultAccessorsRequest{Accessors: accessors}
_, index, err := n.srv.raftApply(structs.VaultAccessorRegisterRequestType, &req)
if err != nil {
n.srv.logger.Printf("[ERR] nomad.client: Register Vault accessors failed: %v", err)
n.srv.logger.Printf("[ERR] nomad.client: Register Vault accessors for alloc %q failed: %v", alloc.ID, err)

// Determine if we can recover from the error
retry := false
Expand Down
3 changes: 3 additions & 0 deletions nomad/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,9 @@ func NewServer(config *Config, consulSyncer *consul.Syncer, logger *log.Logger)
// Emit metrics for the blocked eval tracker.
go blockedEvals.EmitStats(time.Second, s.shutdownCh)

// Emit metrics for the Vault client.
go s.vault.EmitStats(time.Second, s.shutdownCh)

// Emit metrics
go s.heartbeatStats()

Expand Down
86 changes: 81 additions & 5 deletions nomad/vault.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"gopkg.in/tomb.v2"

metrics "github.com/armon/go-metrics"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/structs/config"
Expand Down Expand Up @@ -135,6 +136,21 @@ type VaultClient interface {

// Running returns whether the Vault client is running
Running() bool

// Stats returns the Vault clients statistics
Stats() *VaultStats

// EmitStats emits that clients statistics at the given period until stopCh
// is called.
EmitStats(period time.Duration, stopCh chan struct{})
}

// VaultStats returns all the stats about Vault tokens created and managed by
// Nomad.
type VaultStats struct {
// TrackedForRevoke is the count of tokens that are being tracked to be
// revoked since they could not be immediately revoked.
TrackedForRevoke int
}

// PurgeVaultAccessor is called to remove VaultAccessors from the system. If
Expand Down Expand Up @@ -204,6 +220,10 @@ type vaultClient struct {
tomb *tomb.Tomb
logger *log.Logger

// stats stores the stats
stats *VaultStats
statsLock sync.RWMutex

// l is used to lock the configuration aspects of the client such that
// multiple callers can't cause conflicting config updates
l sync.Mutex
Expand All @@ -227,6 +247,7 @@ func NewVaultClient(c *config.VaultConfig, logger *log.Logger, purgeFn PurgeVaul
revoking: make(map[*structs.VaultAccessor]time.Time),
purgeFn: purgeFn,
tomb: &tomb.Tomb{},
stats: new(VaultStats),
}

if v.config.IsEnabled() {
Expand Down Expand Up @@ -821,7 +842,6 @@ func (v *vaultClient) CreateToken(ctx context.Context, a *structs.Allocation, ta
if !v.Enabled() {
return nil, fmt.Errorf("Vault integration disabled")
}

if !v.Active() {
return nil, structs.NewRecoverableError(fmt.Errorf("Vault client not active"), true)
}
Expand All @@ -833,6 +853,9 @@ func (v *vaultClient) CreateToken(ctx context.Context, a *structs.Allocation, ta
return nil, fmt.Errorf("Connection to Vault failed: %v", err)
}

// Track how long the request takes
defer metrics.MeasureSince([]string{"nomad", "vault", "create_token"}, time.Now())

// Retrieve the Vault block for the task
policies := a.Job.VaultPolicies()
if policies == nil {
Expand Down Expand Up @@ -908,6 +931,9 @@ func (v *vaultClient) LookupToken(ctx context.Context, token string) (*vapi.Secr
return nil, fmt.Errorf("Connection to Vault failed: %v", err)
}

// Track how long the request takes
defer metrics.MeasureSince([]string{"nomad", "vault", "lookup_token"}, time.Now())

// Ensure we are under our rate limit
if err := v.limiter.Wait(ctx); err != nil {
return nil, err
Expand Down Expand Up @@ -943,6 +969,9 @@ func (v *vaultClient) RevokeTokens(ctx context.Context, accessors []*structs.Vau
return fmt.Errorf("Vault client not active")
}

// Track how long the request takes
defer metrics.MeasureSince([]string{"nomad", "vault", "revoke_tokens"}, time.Now())

// Check if we have established a connection with Vault. If not just add it
// to the queue
if established, err := v.ConnectionEstablished(); !established && err == nil {
Expand All @@ -952,22 +981,29 @@ func (v *vaultClient) RevokeTokens(ctx context.Context, accessors []*structs.Vau
v.storeForRevocation(accessors)
}

// Track that we are abandoning these accessors.
metrics.IncrCounter([]string{"nomad", "vault", "undistributed_tokens_abandoned"}, float32(len(accessors)))
return nil
}

// Attempt to revoke immediately and if it fails, add it to the revoke queue
err := v.parallelRevoke(ctx, accessors)
if !committed {
if err != nil {
// If it is uncommitted, it is a best effort revoke as it will shortly
// TTL within the cubbyhole and has not been leaked to any outside
// system
return nil
}
if !committed {
metrics.IncrCounter([]string{"nomad", "vault", "undistributed_tokens_abandoned"}, float32(len(accessors)))
return nil
}

if err != nil {
v.logger.Printf("[WARN] vault: failed to revoke tokens. Will reattempt til TTL: %v", err)
v.storeForRevocation(accessors)
return nil
} else if !committed {
// Mark that it was revoked but there is nothing to purge so exit
metrics.IncrCounter([]string{"nomad", "vault", "undistributed_tokens_revoked"}, float32(len(accessors)))
return nil
}

if err := v.purgeFn(accessors); err != nil {
Expand All @@ -976,6 +1012,9 @@ func (v *vaultClient) RevokeTokens(ctx context.Context, accessors []*structs.Vau
return nil
}

// Track that it was revoked successfully
metrics.IncrCounter([]string{"nomad", "vault", "distributed_tokens_revoked"}, float32(len(accessors)))

return nil
}

Expand All @@ -984,10 +1023,13 @@ func (v *vaultClient) RevokeTokens(ctx context.Context, accessors []*structs.Vau
// time.
func (v *vaultClient) storeForRevocation(accessors []*structs.VaultAccessor) {
v.revLock.Lock()
v.statsLock.Lock()
now := time.Now()
for _, a := range accessors {
v.revoking[a] = now.Add(time.Duration(a.CreationTTL) * time.Second)
}
v.stats.TrackedForRevoke = len(v.revoking)
v.statsLock.Unlock()
v.revLock.Unlock()
}

Expand Down Expand Up @@ -1103,12 +1145,19 @@ func (v *vaultClient) revokeDaemon() {
continue
}

// Track that tokens were revoked successfully
metrics.IncrCounter([]string{"nomad", "vault", "distributed_tokens_revoked"}, float32(len(revoking)))

// Can delete from the tracked list now that we have purged
v.revLock.Lock()
v.statsLock.Lock()
for _, va := range revoking {
delete(v.revoking, va)
}
v.stats.TrackedForRevoke = len(v.revoking)
v.statsLock.Unlock()
v.revLock.Unlock()

}
}
}
Expand Down Expand Up @@ -1137,3 +1186,30 @@ func (v *vaultClient) setLimit(l rate.Limit) {
defer v.l.Unlock()
v.limiter = rate.NewLimiter(l, int(l))
}

// Stats is used to query the state of the blocked eval tracker.
func (v *vaultClient) Stats() *VaultStats {
// Allocate a new stats struct
stats := new(VaultStats)

v.statsLock.RLock()
defer v.statsLock.RUnlock()

// Copy all the stats
stats.TrackedForRevoke = v.stats.TrackedForRevoke

return stats
}

// EmitStats is used to export metrics about the blocked eval tracker while enabled
func (v *vaultClient) EmitStats(period time.Duration, stopCh chan struct{}) {
for {
select {
case <-time.After(period):
stats := v.Stats()
metrics.SetGauge([]string{"nomad", "vault", "distributed_tokens_revoking"}, float32(stats.TrackedForRevoke))
case <-stopCh:
return
}
}
}
4 changes: 4 additions & 0 deletions nomad/vault_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -993,6 +993,10 @@ func TestVaultClient_RevokeTokens_PreEstablishs(t *testing.T) {
if len(client.revoking) != 2 {
t.Fatalf("didn't add to revoke loop")
}

if client.Stats().TrackedForRevoke != 2 {
t.Fatalf("didn't add to revoke loop")
}
}

func TestVaultClient_RevokeTokens_Root(t *testing.T) {
Expand Down
11 changes: 7 additions & 4 deletions nomad/vault_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package nomad

import (
"context"
"time"

"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/structs/config"
Expand Down Expand Up @@ -134,7 +135,9 @@ func (v *TestVaultClient) RevokeTokens(ctx context.Context, accessors []*structs
return nil
}

func (v *TestVaultClient) Stop() {}
func (v *TestVaultClient) SetActive(enabled bool) {}
func (v *TestVaultClient) SetConfig(config *config.VaultConfig) error { return nil }
func (v *TestVaultClient) Running() bool { return true }
func (v *TestVaultClient) Stop() {}
func (v *TestVaultClient) SetActive(enabled bool) {}
func (v *TestVaultClient) SetConfig(config *config.VaultConfig) error { return nil }
func (v *TestVaultClient) Running() bool { return true }
func (v *TestVaultClient) Stats() *VaultStats { return new(VaultStats) }
func (v *TestVaultClient) EmitStats(period time.Duration, stopCh chan struct{}) {}

0 comments on commit da0c005

Please sign in to comment.