Skip to content

Commit

Permalink
Tackle #4929 a different way (#4932)
Browse files Browse the repository at this point in the history
* Tackle #4929 a different way

This turns c.sealed into an atomic, which allows us to call sealInternal
without a lock. By doing so we can better control lock grabbing when a
condition causing the standby loop to get out of active happens. This
encapsulates that logic into two distinct pieces (although they could
be combined into one), and makes lock guarding more understandable.

* Re-add context canceling to the non-HA version of sealInternal

* Return explicitly after stopCh triggered
  • Loading branch information
jefferai authored and briankassouf committed Jul 24, 2018
1 parent 8580cd3 commit 8d2d9fd
Show file tree
Hide file tree
Showing 20 changed files with 154 additions and 311 deletions.
9 changes: 1 addition & 8 deletions command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -837,14 +837,7 @@ CLUSTER_SYNTHESIS_COMPLETE:
return false
}

sealedFunc := func() bool {
if sealed, err := core.Sealed(); err == nil {
return sealed
}
return true
}

if err := sd.RunServiceDiscovery(c.WaitGroup, c.ShutdownCh, coreConfig.RedirectAddr, activeFunc, sealedFunc); err != nil {
if err := sd.RunServiceDiscovery(c.WaitGroup, c.ShutdownCh, coreConfig.RedirectAddr, activeFunc, core.Sealed); err != nil {
c.UI.Error(fmt.Sprintf("Error initializing service discovery: %v", err))
return 1
}
Expand Down
2 changes: 1 addition & 1 deletion http/sys_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func getSysHealth(core *vault.Core, r *http.Request) (int, *HealthResponse, erro
ctx := context.Background()

// Check system status
sealed, _ := core.Sealed()
sealed := core.Sealed()
standby, _ := core.Standby()
var replicationState consts.ReplicationState
if standby {
Expand Down
6 changes: 1 addition & 5 deletions http/sys_init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,7 @@ func TestSysInit_put(t *testing.T) {
}
}

seal, err := core.Sealed()
if err != nil {
t.Fatalf("err: %s", err)
}
if seal {
if core.Sealed() {
t.Fatal("should not be sealed")
}
}
14 changes: 3 additions & 11 deletions http/sys_seal.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,7 @@ func handleSysUnseal(core *vault.Core) http.Handler {
}

if req.Reset {
sealed, err := core.Sealed()
if err != nil {
respondError(w, http.StatusInternalServerError, err)
return
}
if !sealed {
if !core.Sealed() {
respondError(w, http.StatusBadRequest, errors.New("vault is unsealed"))
return
}
Expand Down Expand Up @@ -164,13 +159,10 @@ func handleSysSealStatus(core *vault.Core) http.Handler {
func handleSysSealStatusRaw(core *vault.Core, w http.ResponseWriter, r *http.Request) {
ctx := context.Background()

sealed, err := core.Sealed()
if err != nil {
respondError(w, http.StatusInternalServerError, err)
return
}
sealed := core.Sealed()

var sealConfig *vault.SealConfig
var err error
if core.SealAccess().RecoveryKeySupported() {
sealConfig, err = core.SealAccess().RecoveryConfig(ctx)
} else {
Expand Down
12 changes: 2 additions & 10 deletions http/sys_seal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,7 @@ func TestSysSeal(t *testing.T) {
resp := testHttpPut(t, token, addr+"/v1/sys/seal", nil)
testResponseStatus(t, resp, 204)

check, err := core.Sealed()
if err != nil {
t.Fatalf("err: %s", err)
}
if !check {
if !core.Sealed() {
t.Fatal("should be sealed")
}
}
Expand All @@ -93,11 +89,7 @@ func TestSysSeal_unsealed(t *testing.T) {
resp := testHttpPut(t, token, addr+"/v1/sys/seal", nil)
testResponseStatus(t, resp, 204)

check, err := core.Sealed()
if err != nil {
t.Fatalf("err: %s", err)
}
if !check {
if !core.Sealed() {
t.Fatal("should be sealed")
}
}
Expand Down
6 changes: 1 addition & 5 deletions vault/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,7 @@ func TestClusterHAFetching(t *testing.T) {
}

// Verify unsealed
sealed, err := c.Sealed()
if err != nil {
t.Fatalf("err checking seal status: %s", err)
}
if sealed {
if c.Sealed() {
t.Fatal("should not be sealed")
}

Expand Down
84 changes: 35 additions & 49 deletions vault/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ type Core struct {

// stateLock protects mutable state
stateLock sync.RWMutex
sealed bool
sealed *uint32

standby bool
standbyDoneCh chan struct{}
Expand Down Expand Up @@ -480,7 +480,7 @@ func NewCore(conf *CoreConfig) (*Core, error) {
clusterAddr: conf.ClusterAddr,
seal: conf.Seal,
router: NewRouter(),
sealed: true,
sealed: new(uint32),
standby: true,
logger: conf.Logger.Named("core"),
defaultLeaseTTL: conf.DefaultLeaseTTL,
Expand All @@ -503,6 +503,8 @@ func NewCore(conf *CoreConfig) (*Core, error) {
keepHALockOnStepDown: new(uint32),
}

atomic.StoreUint32(c.sealed, 1)

atomic.StoreUint32(c.replicationState, uint32(consts.ReplicationDRDisabled|consts.ReplicationPerformanceDisabled))
c.localClusterCert.Store(([]byte)(nil))
c.localClusterParsedCert.Store((*x509.Certificate)(nil))
Expand Down Expand Up @@ -634,19 +636,6 @@ func NewCore(conf *CoreConfig) (*Core, error) {
// happens as quickly as possible.
func (c *Core) Shutdown() error {
c.logger.Debug("shutdown called")
c.stateLock.RLock()
// Tell any requests that know about this to stop
if c.activeContextCancelFunc != nil {
c.activeContextCancelFunc()
}
c.stateLock.RUnlock()

c.logger.Debug("shutdown initiating internal seal")
// Seal the Vault, causes a leader stepdown
c.stateLock.Lock()
defer c.stateLock.Unlock()

c.logger.Debug("shutdown running internal seal")
return c.sealInternal(false)
}

Expand All @@ -663,10 +652,8 @@ func (c *Core) GetContext() (context.Context, context.CancelFunc) {
}

// Sealed checks if the Vault is current sealed
func (c *Core) Sealed() (bool, error) {
c.stateLock.RLock()
defer c.stateLock.RUnlock()
return c.sealed, nil
func (c *Core) Sealed() bool {
return atomic.LoadUint32(c.sealed) == 1
}

// SecretProgress returns the number of keys provided so far
Expand All @@ -686,9 +673,6 @@ func (c *Core) SecretProgress() (int, string) {
func (c *Core) ResetUnsealProcess() {
c.stateLock.Lock()
defer c.stateLock.Unlock()
if !c.sealed {
return
}
c.unlockInfo = nil
}

Expand Down Expand Up @@ -732,7 +716,7 @@ func (c *Core) Unseal(key []byte) (bool, error) {
}

// Check if already unsealed
if !c.sealed {
if !c.Sealed() {
return true, nil
}

Expand Down Expand Up @@ -774,7 +758,7 @@ func (c *Core) UnsealWithRecoveryKeys(ctx context.Context, key []byte) (bool, er
}

// Check if already unsealed
if !c.sealed {
if !c.Sealed() {
return true, nil
}

Expand Down Expand Up @@ -918,14 +902,14 @@ func (c *Core) unsealInternal(ctx context.Context, masterKey []byte) (bool, erro
go c.runStandby(c.standbyDoneCh, c.manualStepDownCh, c.standbyStopCh)
}

// Success!
c.sealed = false

// Force a cache bust here, which will also run migration code
if c.seal.RecoveryKeySupported() {
c.seal.SetRecoveryConfig(ctx, nil)
}

// Success!
atomic.StoreUint32(c.sealed, 0)

if c.ha != nil {
sd, ok := c.ha.(physical.ServiceDiscovery)
if ok {
Expand All @@ -944,13 +928,12 @@ func (c *Core) unsealInternal(ctx context.Context, masterKey []byte) (bool, erro
func (c *Core) SealWithRequest(req *logical.Request) error {
defer metrics.MeasureSince([]string{"core", "seal-with-request"}, time.Now())

c.stateLock.RLock()

if c.sealed {
c.stateLock.RUnlock()
if c.Sealed() {
return nil
}

c.stateLock.RLock()

// This will unlock the read lock
// We use background context since we may not be active
return c.sealInitCommon(context.Background(), req)
Expand All @@ -961,13 +944,12 @@ func (c *Core) SealWithRequest(req *logical.Request) error {
func (c *Core) Seal(token string) error {
defer metrics.MeasureSince([]string{"core", "seal"}, time.Now())

c.stateLock.RLock()

if c.sealed {
c.stateLock.RUnlock()
if c.Sealed() {
return nil
}

c.stateLock.RLock()

req := &logical.Request{
Operation: logical.UpdateOperation,
Path: "sys/seal",
Expand Down Expand Up @@ -1094,17 +1076,9 @@ func (c *Core) sealInitCommon(ctx context.Context, req *logical.Request) (retErr
}
}

// Tell any requests that know about this to stop
if c.activeContextCancelFunc != nil {
c.activeContextCancelFunc()
}

// Unlock from the request handling
// Unlock; sealing will grab the lock when needed
c.stateLock.RUnlock()

//Seal the Vault
c.stateLock.Lock()
defer c.stateLock.Unlock()
sealErr := c.sealInternal(false)

if sealErr != nil {
Expand All @@ -1125,15 +1099,13 @@ func (c *Core) UIHeaders() (http.Header, error) {
}

// sealInternal is an internal method used to seal the vault. It does not do
// any authorization checking. The stateLock must be held prior to calling.
// any authorization checking.
func (c *Core) sealInternal(keepLock bool) error {
if c.sealed {
// Mark sealed, and if already marked return
if swapped := atomic.CompareAndSwapUint32(c.sealed, 0, 1); !swapped {
return nil
}

// Enable that we are sealed to prevent further transactions
c.sealed = true

c.logger.Debug("marked as sealed")

// Clear forwarding clients
Expand All @@ -1143,15 +1115,29 @@ func (c *Core) sealInternal(keepLock bool) error {

// Do pre-seal teardown if HA is not enabled
if c.ha == nil {
c.stateLock.Lock()
defer c.stateLock.Unlock()
// Even in a non-HA context we key off of this for some things
c.standby = true

// Stop requests from processing
if c.activeContextCancelFunc != nil {
c.activeContextCancelFunc()
}

if err := c.preSeal(); err != nil {
c.logger.Error("pre-seal teardown failed", "error", err)
return fmt.Errorf("internal error")
}
} else {
// If we are keeping the lock we already have the state write lock
// held. Otherwise grab it here so that when stopCh is triggered we are
// locked.
if keepLock {
atomic.StoreUint32(c.keepHALockOnStepDown, 1)
} else {
c.stateLock.Lock()
defer c.stateLock.Unlock()
}
// If we are trying to acquire the lock, force it to return with nil so
// runStandby will exit
Expand Down
Loading

0 comments on commit 8d2d9fd

Please sign in to comment.