Skip to content

Commit

Permalink
adding restore of lease when loaded during restoration
Browse files Browse the repository at this point in the history
  • Loading branch information
Chris Hoffman committed Aug 30, 2017
1 parent 67f6cf8 commit 8fc8fbd
Showing 1 changed file with 84 additions and 60 deletions.
144 changes: 84 additions & 60 deletions vault/expiration.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,18 @@ const (
// If a secret is not renewed in timely manner, it may be expired, and
// the ExpirationManager will handle doing automatic revocation.
type ExpirationManager struct {
l sync.RWMutex

router *Router
idView *BarrierView
tokenView *BarrierView
tokenStore *TokenStore
logger log.Logger

pending map[string]*time.Timer
pendingLock sync.RWMutex
pending map[string]*time.Timer

tidyLock int64
tidyLock int64
restoreLock int64
}

// NewExpirationManager creates a new ExpirationManager that is backed
Expand Down Expand Up @@ -98,18 +100,7 @@ func (c *Core) setupExpiration() error {
// Link the token store to this
c.tokenStore.SetExpirationManager(mgr)

// Restore the existing state
c.logger.Info("expiration: restoring leases")

// Accumulate existing leases
c.logger.Debug("expiration: collecting leases")
existing, err := logical.CollectKeys(mgr.idView)
if err != nil {
return fmt.Errorf("failed to scan for leases: %v", err)
}
c.logger.Debug("expiration: leases collected", "num_existing", len(existing))

go c.expiration.Restore(existing)
go c.expiration.Restore(c.Shutdown, 1*time.Second)

return nil
}
Expand Down Expand Up @@ -206,11 +197,11 @@ func (m *ExpirationManager) Tidy() error {
} else {
if isValid {
return
} else {
m.logger.Trace("expiration: revoking lease which contains an invalid token", "lease_id", leaseID)
revokeLease = true
deletedCountInvalidToken++
}

m.logger.Trace("expiration: revoking lease which contains an invalid token", "lease_id", leaseID)
revokeLease = true
deletedCountInvalidToken++
goto REVOKE_CHECK
}

Expand Down Expand Up @@ -241,7 +232,30 @@ func (m *ExpirationManager) Tidy() error {

// Restore is used to recover the lease states when starting.
// This is used after starting the vault.
func (m *ExpirationManager) Restore(existing []string) {
func (m *ExpirationManager) Restore(errorFunc func() error, loadDelay time.Duration) (retErr error) {
defer func() {
if retErr != nil {
m.logger.Error("expiration: error restoring leases, shutting down", "error", retErr)
if err := errorFunc(); err != nil {
m.logger.Error("expiration: error shutting down", "error", err)
}
}
}()

atomic.StoreInt64(&m.restoreLock, 1)
defer atomic.StoreInt64(&m.restoreLock, 0)

// Restore the existing state
m.logger.Info("expiration: restoring leases")

// Accumulate existing leases
m.logger.Debug("expiration: collecting leases")
existing, err := logical.CollectKeys(m.idView)
if err != nil {
return fmt.Errorf("failed to scan for leases: %v", err)
}
m.logger.Debug("expiration: leases collected", "num_existing", len(existing))

// Make the channels used for the worker pool
broker := make(chan string)
quit := make(chan bool)
Expand All @@ -266,6 +280,10 @@ func (m *ExpirationManager) Restore(existing []string) {
return
}

if loadDelay > 0 {
time.Sleep(loadDelay)
}

le, err := m.loadEntry(leaseID)
if err != nil {
errs <- err
Expand Down Expand Up @@ -312,60 +330,60 @@ func (m *ExpirationManager) Restore(existing []string) {
case err := <-errs:
// Close all go routines
close(quit)
m.logger.Trace("expiration: error restoring leases", "error", err)
return
return err

case le := <-result:

// If there is no entry, nothing to restore
if le == nil {
continue
}

// If there is no expiry time, don't do anything
if le.ExpireTime.IsZero() {
continue
}

m.restoreLease(le)
restoredCount++

// Determine the remaining time to expiration
expires := le.ExpireTime.Sub(time.Now())
if expires <= 0 {
expires = minRevokeDelay
}

// Setup revocation timer
m.pendingLock.Lock()
m.pending[le.LeaseID] = time.AfterFunc(expires, func() {
m.expireID(le.LeaseID)
})
m.pendingLock.Unlock()
}
}

// Let all go routines finish
wg.Wait()

m.pendingLock.RLock()
if len(m.pending) > 0 {
if restoredCount > 0 {
if m.logger.IsInfo() {
m.logger.Info("expire: leases restored", "restored_lease_count", restoredCount)
m.logger.Info("expiration: leases restored", "restored_lease_count", restoredCount)
m.l.RLock()
m.logger.Info("expiration: pending leases", "lease_count", len(m.pending))
m.l.RUnlock()
}
}
m.pendingLock.RUnlock()

return nil
}

func (m *ExpirationManager) restoreLease(le *leaseEntry) {
// If there is no entry, nothing to restore
if le == nil {
return
}

// If there is no expiry time, don't do anything
if le.ExpireTime.IsZero() {
return
}

// Determine the remaining time to expiration
expires := le.ExpireTime.Sub(time.Now())
if expires <= 0 {
expires = minRevokeDelay
}

// Setup revocation timer
m.updatePending(le, expires)
}

// Stop is used to prevent further automatic revocations.
// This must be called before sealing the view.
func (m *ExpirationManager) Stop() error {
// Stop all the pending expiration timers
m.pendingLock.Lock()
m.l.Lock()
for _, timer := range m.pending {
timer.Stop()
}
m.pending = make(map[string]*time.Timer)
m.pendingLock.Unlock()
m.l.Unlock()
return nil
}

Expand All @@ -391,6 +409,9 @@ func (m *ExpirationManager) revokeCommon(leaseID string, force, skipToken bool)
return nil
}

m.l.Lock()
defer m.l.Unlock()

// Revoke the entry
if !skipToken || le.Auth == nil {
if err := m.revokeEntry(le); err != nil {
Expand All @@ -417,12 +438,10 @@ func (m *ExpirationManager) revokeCommon(leaseID string, force, skipToken bool)
}

// Clear the expiration handler
m.pendingLock.Lock()
if timer, ok := m.pending[leaseID]; ok {
timer.Stop()
delete(m.pending, leaseID)
}
m.pendingLock.Unlock()
return nil
}

Expand Down Expand Up @@ -796,8 +815,8 @@ func (m *ExpirationManager) FetchLeaseTimes(leaseID string) (*leaseEntry, error)

// updatePending is used to update a pending invocation for a lease
func (m *ExpirationManager) updatePending(le *leaseEntry, leaseTotal time.Duration) {
m.pendingLock.Lock()
defer m.pendingLock.Unlock()
m.l.Lock()
defer m.l.Unlock()

// Check for an existing timer
timer, ok := m.pending[le.LeaseID]
Expand Down Expand Up @@ -827,9 +846,9 @@ func (m *ExpirationManager) updatePending(le *leaseEntry, leaseTotal time.Durati
// expireID is invoked when a given ID is expired
func (m *ExpirationManager) expireID(leaseID string) {
// Clear from the pending expiration
m.pendingLock.Lock()
m.l.Lock()
delete(m.pending, leaseID)
m.pendingLock.Unlock()
m.l.Unlock()

for attempt := uint(0); attempt < maxRevokeAttempts; attempt++ {
err := m.Revoke(leaseID)
Expand Down Expand Up @@ -915,6 +934,11 @@ func (m *ExpirationManager) loadEntry(leaseID string) (*leaseEntry, error) {
if err != nil {
return nil, fmt.Errorf("failed to decode lease entry: %v", err)
}

if atomic.LoadInt64(&m.restoreLock) == 1 {
m.restoreLease(le)
}

return le, nil
}

Expand Down Expand Up @@ -1037,9 +1061,9 @@ func (m *ExpirationManager) lookupByToken(token string) ([]string, error) {

// emitMetrics is invoked periodically to emit statistics
func (m *ExpirationManager) emitMetrics() {
m.pendingLock.RLock()
m.l.RLock()
num := len(m.pending)
m.pendingLock.RUnlock()
m.l.RUnlock()
metrics.SetGauge([]string{"expire", "num_leases"}, float32(num))
}

Expand Down

0 comments on commit 8fc8fbd

Please sign in to comment.