Skip to content

Commit

Permalink
database: add (Acquire|Release)Lock dbutils
Browse files Browse the repository at this point in the history
  • Loading branch information
jzelinskie committed Nov 27, 2018
1 parent 504f0f3 commit 4fbeb9c
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 52 deletions.
46 changes: 46 additions & 0 deletions database/dbutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package database

import (
"time"

"github.com/deckarep/golang-set"
)

Expand Down Expand Up @@ -304,3 +306,47 @@ func MergeLayers(l *Layer, new *Layer) *Layer {

return l
}

// AcquireLock acquires a named global lock for a duration.
//
// If renewal is true, the lock is extended as long as the same owner is
// attempting to renew the lock.
func AcquireLock(datastore Datastore, name, owner string, duration time.Duration, renewal bool) (success bool, expiration time.Time) {
// any error will cause the function to catch the error and return false.
tx, err := datastore.Begin()
if err != nil {
return false, time.Time{}
}

defer tx.Rollback()

locked, t, err := tx.Lock(name, owner, duration, renewal)
if err != nil {
return false, time.Time{}
}

if locked {
if err := tx.Commit(); err != nil {
return false, time.Time{}
}
}

return locked, t
}

// ReleaseLock releases a named global lock.
func ReleaseLock(datastore Datastore, name, owner string) {
tx, err := datastore.Begin()
if err != nil {
return
}

defer tx.Rollback()

if err := tx.Unlock(name, owner); err != nil {
return
}
if err := tx.Commit(); err != nil {
return
}
}
47 changes: 3 additions & 44 deletions notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func RunNotifier(config *notification.Config, datastore database.Datastore, stop
if interrupted {
running = false
}
unlock(datastore, notification.Name, whoAmI)
database.ReleaseLock(datastore, notification.Name, whoAmI)
done <- true
}()

Expand All @@ -113,7 +113,7 @@ func RunNotifier(config *notification.Config, datastore database.Datastore, stop
case <-done:
break outer
case <-time.After(notifierLockRefreshDuration):
lock(datastore, notification.Name, whoAmI, notifierLockDuration, true)
database.AcquireLock(datastore, notification.Name, whoAmI, notifierLockDuration, true)
case <-stopper.Chan():
running = false
break
Expand Down Expand Up @@ -141,7 +141,7 @@ func findTask(datastore database.Datastore, renotifyInterval time.Duration, whoA
}

// Lock the notification.
if hasLock, _ := lock(datastore, notification.Name, whoAmI, notifierLockDuration, false); hasLock {
if hasLock, _ := database.AcquireLock(datastore, notification.Name, whoAmI, notifierLockDuration, false); hasLock {
log.WithField(logNotiName, notification.Name).Info("found and locked a notification")
return &notification
}
Expand Down Expand Up @@ -208,44 +208,3 @@ func markNotificationAsRead(datastore database.Datastore, name string) error {
}
return tx.Commit()
}

// unlock removes a lock with provided name, owner. Internally, it handles
// database transaction and catches error.
func unlock(datastore database.Datastore, name, owner string) {
tx, err := datastore.Begin()
if err != nil {
return
}

defer tx.Rollback()

if err := tx.Unlock(name, owner); err != nil {
return
}
if err := tx.Commit(); err != nil {
return
}
}

func lock(datastore database.Datastore, name string, owner string, duration time.Duration, renew bool) (bool, time.Time) {
// any error will cause the function to catch the error and return false.
tx, err := datastore.Begin()
if err != nil {
return false, time.Time{}
}

defer tx.Rollback()

locked, t, err := tx.Lock(name, owner, duration, renew)
if err != nil {
return false, time.Time{}
}

if locked {
if err := tx.Commit(); err != nil {
return false, time.Time{}
}
}

return locked, t
}
15 changes: 7 additions & 8 deletions updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,46 +91,45 @@ func RunUpdater(config *UpdaterConfig, datastore database.Datastore, st *stopper
log.WithField("lock identifier", whoAmI).Info("updater service started")

for {
var stop bool

// Determine if this is the first update and define the next update time.
// The next update time is (last update time + interval) or now if this is the first update.
nextUpdate := time.Now().UTC()
lastUpdate, firstUpdate, err := GetLastUpdateTime(datastore)
lastUpdate, isFirstUpdate, err := GetLastUpdateTime(datastore)
if err != nil {
log.WithError(err).Error("an error occurred while getting the last update time")
nextUpdate = nextUpdate.Add(config.Interval)
} else if !firstUpdate {
} else if !isFirstUpdate {
nextUpdate = lastUpdate.Add(config.Interval)
}

// If the next update timer is in the past, then try to update.
if nextUpdate.Before(time.Now().UTC()) {
// Attempt to get a lock on the the update.
log.Debug("attempting to obtain update lock")
hasLock, hasLockUntil := lock(datastore, updaterLockName, whoAmI, updaterLockDuration, false)
hasLock, hasLockUntil := database.AcquireLock(datastore, updaterLockName, whoAmI, updaterLockDuration, false)
if hasLock {
// Launch update in a new go routine.
doneC := make(chan bool, 1)
go func() {
update(datastore, firstUpdate)
update(datastore, isFirstUpdate)
doneC <- true
}()

var stop bool
for done := false; !done && !stop; {
select {
case <-doneC:
done = true
case <-time.After(updaterLockRefreshDuration):
// Refresh the lock until the update is done.
lock(datastore, updaterLockName, whoAmI, updaterLockDuration, true)
database.AcquireLock(datastore, updaterLockName, whoAmI, updaterLockDuration, true)
case <-st.Chan():
stop = true
}
}

// Unlock the updater.
unlock(datastore, updaterLockName, whoAmI)
database.ReleaseLock(datastore, updaterLockName, whoAmI)

if stop {
break
Expand Down

0 comments on commit 4fbeb9c

Please sign in to comment.