Skip to content

Commit

Permalink
Some atomic cleanup
Browse files Browse the repository at this point in the history
Taking inspiration from
golang/go#17604 (comment)
suggests that taking the address of a stack variable for use in atomics
works (at least, the race detector doesn't complain) but is doing it
wrong.

The only other change is a change in Leader() detecting if HA is enabled
to fast-path out. This value never changes after NewCore, so we don't
need to grab the read lock to check it.
  • Loading branch information
jefferai committed Jun 9, 2018
1 parent 8eee1da commit 3c1d57d
Show file tree
Hide file tree
Showing 16 changed files with 121 additions and 105 deletions.
4 changes: 3 additions & 1 deletion builtin/credential/approle/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type backend struct {
view logical.Storage

// Guard to clean-up the expired SecretID entries
tidySecretIDCASGuard uint32
tidySecretIDCASGuard *uint32

// Locks to make changes to role entries. These will be initialized to a
// predefined number of locks when the backend is created, and will be
Expand Down Expand Up @@ -85,6 +85,8 @@ func Backend(conf *logical.BackendConfig) (*backend, error) {

// Create locks to modify the generated SecretIDAccessors
secretIDAccessorLocks: locksutil.CreateLocks(),

tidySecretIDCASGuard: new(uint32),
}

// Attach the paths and secrets that are to be handled by the backend
Expand Down
4 changes: 2 additions & 2 deletions builtin/credential/approle/path_tidy_user_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ func pathTidySecretID(b *backend) *framework.Path {

// tidySecretID is used to delete entries in the whitelist that are expired.
func (b *backend) tidySecretID(ctx context.Context, s logical.Storage) error {
grabbed := atomic.CompareAndSwapUint32(&b.tidySecretIDCASGuard, 0, 1)
grabbed := atomic.CompareAndSwapUint32(b.tidySecretIDCASGuard, 0, 1)
if grabbed {
defer atomic.StoreUint32(&b.tidySecretIDCASGuard, 0)
defer atomic.StoreUint32(b.tidySecretIDCASGuard, 0)
} else {
return fmt.Errorf("SecretID tidy operation already running")
}
Expand Down
14 changes: 8 additions & 6 deletions builtin/credential/aws/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ type backend struct {
blacklistMutex sync.RWMutex

// Guards the blacklist/whitelist tidy functions
tidyBlacklistCASGuard uint32
tidyWhitelistCASGuard uint32
tidyBlacklistCASGuard *uint32
tidyWhitelistCASGuard *uint32

// Duration after which the periodic function of the backend needs to
// tidy the blacklist and whitelist entries.
Expand Down Expand Up @@ -82,10 +82,12 @@ func Backend(conf *logical.BackendConfig) (*backend, error) {
b := &backend{
// Setting the periodic func to be run once in an hour.
// If there is a real need, this can be made configurable.
tidyCooldownPeriod: time.Hour,
EC2ClientsMap: make(map[string]map[string]*ec2.EC2),
IAMClientsMap: make(map[string]map[string]*iam.IAM),
iamUserIdToArnCache: cache.New(7*24*time.Hour, 24*time.Hour),
tidyCooldownPeriod: time.Hour,
EC2ClientsMap: make(map[string]map[string]*ec2.EC2),
IAMClientsMap: make(map[string]map[string]*iam.IAM),
iamUserIdToArnCache: cache.New(7*24*time.Hour, 24*time.Hour),
tidyBlacklistCASGuard: new(uint32),
tidyWhitelistCASGuard: new(uint32),
}

b.resolveArnToUniqueIDFunc = b.resolveArnToRealUniqueId
Expand Down
4 changes: 2 additions & 2 deletions builtin/credential/aws/path_tidy_identity_whitelist.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ expiration, before it is removed from the backend storage.`,

// tidyWhitelistIdentity is used to delete entries in the whitelist that are expired.
func (b *backend) tidyWhitelistIdentity(ctx context.Context, s logical.Storage, safety_buffer int) error {
grabbed := atomic.CompareAndSwapUint32(&b.tidyWhitelistCASGuard, 0, 1)
grabbed := atomic.CompareAndSwapUint32(b.tidyWhitelistCASGuard, 0, 1)
if grabbed {
defer atomic.StoreUint32(&b.tidyWhitelistCASGuard, 0)
defer atomic.StoreUint32(b.tidyWhitelistCASGuard, 0)
} else {
return fmt.Errorf("identity whitelist tidy operation already running")
}
Expand Down
4 changes: 2 additions & 2 deletions builtin/credential/aws/path_tidy_roletag_blacklist.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ expiration, before it is removed from the backend storage.`,

// tidyBlacklistRoleTag is used to clean-up the entries in the role tag blacklist.
func (b *backend) tidyBlacklistRoleTag(ctx context.Context, s logical.Storage, safety_buffer int) error {
grabbed := atomic.CompareAndSwapUint32(&b.tidyBlacklistCASGuard, 0, 1)
grabbed := atomic.CompareAndSwapUint32(b.tidyBlacklistCASGuard, 0, 1)
if grabbed {
defer atomic.StoreUint32(&b.tidyBlacklistCASGuard, 0)
defer atomic.StoreUint32(b.tidyBlacklistCASGuard, 0)
} else {
return fmt.Errorf("roletag blacklist tidy operation already running")
}
Expand Down
36 changes: 19 additions & 17 deletions http/forwarding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,24 +191,26 @@ func testHTTP_Forwarding_Stress_Common(t *testing.T, parallel bool, num uint64)
var key1ver int64 = 1
var key2ver int64 = 1
var key3ver int64 = 1
var numWorkers uint64 = 50
var numWorkersStarted uint64
var numWorkers *uint32 = new(uint32)
*numWorkers = 50
var numWorkersStarted *uint32 = new(uint32)
var waitLock sync.Mutex
waitCond := sync.NewCond(&waitLock)

// This is the goroutine loop
doFuzzy := func(id int, parallel bool) {
var myTotalOps uint64
var mySuccessfulOps uint64
var keyVer int64 = 1
var myTotalOps *uint32 = new(uint32)
var mySuccessfulOps *uint32 = new(uint32)
var keyVer *int32 = new(int32)
*keyVer = 1
// Check for panics, otherwise notify we're done
defer func() {
if err := recover(); err != nil {
core.Logger().Error("got a panic: %v", err)
t.Fail()
}
atomic.AddUint64(&totalOps, myTotalOps)
atomic.AddUint64(&successfulOps, mySuccessfulOps)
atomic.AddUint32(totalOps, myTotalOps)
atomic.AddUint32(successfulOps, mySuccessfulOps)
wg.Done()
}()

Expand Down Expand Up @@ -281,10 +283,10 @@ func testHTTP_Forwarding_Stress_Common(t *testing.T, parallel bool, num uint64)
}
}

atomic.AddUint64(&numWorkersStarted, 1)
atomic.AddUint32(numWorkersStarted, 1)

waitCond.L.Lock()
for atomic.LoadUint64(&numWorkersStarted) != numWorkers {
for atomic.LoadUint32(numWorkersStarted) != atomic.LoadUint32(numWorkers) {
waitCond.Wait()
}
waitCond.L.Unlock()
Expand Down Expand Up @@ -375,11 +377,11 @@ func testHTTP_Forwarding_Stress_Common(t *testing.T, parallel bool, num uint64)
if parallel {
switch chosenKey {
case "test1":
atomic.AddInt64(&key1ver, 1)
atomic.AddInt32(key1ver, 1)
case "test2":
atomic.AddInt64(&key2ver, 1)
atomic.AddInt32(key2ver, 1)
case "test3":
atomic.AddInt64(&key3ver, 1)
atomic.AddInt32(key3ver, 1)
}
} else {
keyVer++
Expand All @@ -393,11 +395,11 @@ func testHTTP_Forwarding_Stress_Common(t *testing.T, parallel bool, num uint64)
if parallel {
switch chosenKey {
case "test1":
latestVersion = atomic.LoadInt64(&key1ver)
latestVersion = atomic.LoadInt32(key1ver)
case "test2":
latestVersion = atomic.LoadInt64(&key2ver)
latestVersion = atomic.LoadInt32(key2ver)
case "test3":
latestVersion = atomic.LoadInt64(&key3ver)
latestVersion = atomic.LoadInt32(key3ver)
}
}

Expand All @@ -415,10 +417,10 @@ func testHTTP_Forwarding_Stress_Common(t *testing.T, parallel bool, num uint64)
}
}

atomic.StoreUint64(&numWorkers, num)
atomic.StoreUint32(numWorkers, num)

// Spawn some of these workers for 10 seconds
for i := 0; i < int(atomic.LoadUint64(&numWorkers)); i++ {
for i := 0; i < int(atomic.LoadUint32(numWorkers)); i++ {
wg.Add(1)
//core.Logger().Printf("[TRACE] spawning %d", i)
go doFuzzy(i+1, parallel)
Expand Down
30 changes: 15 additions & 15 deletions logical/framework/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,9 @@ func TestBackendHandleRequest_renewAuth(t *testing.T) {
}

func TestBackendHandleRequest_renewAuthCallback(t *testing.T) {
var called uint32
called := new(uint32)
callback := func(context.Context, *logical.Request, *FieldData) (*logical.Response, error) {
atomic.AddUint32(&called, 1)
atomic.AddUint32(called, 1)
return nil, nil
}

Expand All @@ -217,14 +217,14 @@ func TestBackendHandleRequest_renewAuthCallback(t *testing.T) {
if err != nil {
t.Fatalf("err: %s", err)
}
if v := atomic.LoadUint32(&called); v != 1 {
if v := atomic.LoadUint32(called); v != 1 {
t.Fatalf("bad: %#v", v)
}
}
func TestBackendHandleRequest_renew(t *testing.T) {
var called uint32
called := new(uint32)
callback := func(context.Context, *logical.Request, *FieldData) (*logical.Response, error) {
atomic.AddUint32(&called, 1)
atomic.AddUint32(called, 1)
return nil, nil
}

Expand All @@ -240,15 +240,15 @@ func TestBackendHandleRequest_renew(t *testing.T) {
if err != nil {
t.Fatalf("err: %s", err)
}
if v := atomic.LoadUint32(&called); v != 1 {
if v := atomic.LoadUint32(called); v != 1 {
t.Fatalf("bad: %#v", v)
}
}

func TestBackendHandleRequest_revoke(t *testing.T) {
var called uint32
called := new(uint32)
callback := func(context.Context, *logical.Request, *FieldData) (*logical.Response, error) {
atomic.AddUint32(&called, 1)
atomic.AddUint32(called, 1)
return nil, nil
}

Expand All @@ -264,16 +264,16 @@ func TestBackendHandleRequest_revoke(t *testing.T) {
if err != nil {
t.Fatalf("err: %s", err)
}
if v := atomic.LoadUint32(&called); v != 1 {
if v := atomic.LoadUint32(called); v != 1 {
t.Fatalf("bad: %#v", v)
}
}

func TestBackendHandleRequest_rollback(t *testing.T) {
var called uint32
called := new(uint32)
callback := func(_ context.Context, req *logical.Request, kind string, data interface{}) error {
if data == "foo" {
atomic.AddUint32(&called, 1)
atomic.AddUint32(called, 1)
}
return nil
}
Expand All @@ -298,16 +298,16 @@ func TestBackendHandleRequest_rollback(t *testing.T) {
if err != nil {
t.Fatalf("err: %s", err)
}
if v := atomic.LoadUint32(&called); v != 1 {
if v := atomic.LoadUint32(called); v != 1 {
t.Fatalf("bad: %#v", v)
}
}

func TestBackendHandleRequest_rollbackMinAge(t *testing.T) {
var called uint32
called := new(uint32)
callback := func(_ context.Context, req *logical.Request, kind string, data interface{}) error {
if data == "foo" {
atomic.AddUint32(&called, 1)
atomic.AddUint32(called, 1)
}
return nil
}
Expand All @@ -330,7 +330,7 @@ func TestBackendHandleRequest_rollbackMinAge(t *testing.T) {
if err != nil {
t.Fatalf("err: %s", err)
}
if v := atomic.LoadUint32(&called); v != 0 {
if v := atomic.LoadUint32(called); v != 0 {
t.Fatalf("bad: %#v", v)
}
}
Expand Down
12 changes: 6 additions & 6 deletions physical/consul/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,9 +636,9 @@ func (c *ConsulBackend) runEventDemuxer(waitGroup *sync.WaitGroup, shutdownCh ph
// and end of a handler's life (or after a handler wakes up from
// sleeping during a back-off/retry).
var shutdown bool
var checkLock int64
var registeredServiceID string
var serviceRegLock int64
checkLock := new(int32)
serviceRegLock := new(int32)

for !shutdown {
select {
Expand All @@ -654,10 +654,10 @@ func (c *ConsulBackend) runEventDemuxer(waitGroup *sync.WaitGroup, shutdownCh ph

// Abort if service discovery is disabled or a
// reconcile handler is already active
if !c.disableRegistration && atomic.CompareAndSwapInt64(&serviceRegLock, 0, 1) {
if !c.disableRegistration && atomic.CompareAndSwapInt32(serviceRegLock, 0, 1) {
// Enter handler with serviceRegLock held
go func() {
defer atomic.CompareAndSwapInt64(&serviceRegLock, 1, 0)
defer atomic.CompareAndSwapInt32(serviceRegLock, 1, 0)
for !shutdown {
serviceID, err := c.reconcileConsul(registeredServiceID, activeFunc, sealedFunc)
if err != nil {
Expand All @@ -680,10 +680,10 @@ func (c *ConsulBackend) runEventDemuxer(waitGroup *sync.WaitGroup, shutdownCh ph
checkTimer.Reset(c.checkDuration())
// Abort if service discovery is disabled or a
// reconcile handler is active
if !c.disableRegistration && atomic.CompareAndSwapInt64(&checkLock, 0, 1) {
if !c.disableRegistration && atomic.CompareAndSwapInt32(checkLock, 0, 1) {
// Enter handler with checkLock held
go func() {
defer atomic.CompareAndSwapInt64(&checkLock, 1, 0)
defer atomic.CompareAndSwapInt32(checkLock, 1, 0)
for !shutdown {
sealed := sealedFunc()
if err := c.runCheck(sealed); err != nil {
Expand Down
Loading

0 comments on commit 3c1d57d

Please sign in to comment.