Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use global lock instead of NewExclusivePool to allow distributed lock between multiple Gitea instances #31813

Merged
merged 18 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions modules/globallock/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@ import (
)

type Locker interface {
Lock() error
TryLock() (bool, error)
Unlock() (bool, error)
Lock() error // lock the resource and block until it is unlocked by the holder
TryLock() (bool, error) // try to lock the resource and return immediately, first return value indicates if the lock was successful
Unlock() (bool, error) // only lock with no error and TryLock returned true with no error can be unlocked
}

type LockService interface {
GetLock(name string) Locker
GetLocker(name string) Locker // create or get a locker by name, RemoveLocker should be called after the locker is no longer needed
RemoveLocker(name string) // remove a locker by name from the pool. This should be invoked affect locker is no longer needed, i.e. a pull request merged or closed
}

type memoryLock struct {
Expand Down Expand Up @@ -57,11 +58,15 @@ func newMemoryLockService() *memoryLockService {
}
}

func (l *memoryLockService) GetLock(name string) Locker {
func (l *memoryLockService) GetLocker(name string) Locker {
v, _ := l.syncMap.LoadOrStore(name, &memoryLock{})
return v.(*memoryLock)
}

func (l *memoryLockService) RemoveLocker(name string) {
l.syncMap.Delete(name)
lunny marked this conversation as resolved.
Show resolved Hide resolved
}

type redisLockService struct {
rs *redsync.Redsync
}
Expand All @@ -86,10 +91,14 @@ type redisLock struct {
mutex *redsync.Mutex
}

func (r *redisLockService) GetLock(name string) Locker {
func (r *redisLockService) GetLocker(name string) Locker {
return &redisLock{mutex: r.rs.NewMutex(name)}
}

func (r *redisLockService) RemoveLocker(name string) {
// Do nothing
}

func (r *redisLock) Lock() error {
return r.mutex.Lock()
lunny marked this conversation as resolved.
Show resolved Hide resolved
}
Expand Down Expand Up @@ -123,6 +132,6 @@ func getLockService() LockService {
return lockService
}

func GetLock(name string) Locker {
return getLockService().GetLock(name)
func GetLocker(name string) Locker {
return getLockService().GetLocker(name)
}
55 changes: 47 additions & 8 deletions modules/globallock/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,56 @@ import (
)

func Test_Lock(t *testing.T) {
locker := GetLock("test")
assert.NoError(t, locker.Lock())
locker.Unlock()
locker1 := GetLocker("test2")
assert.NoError(t, locker1.Lock())
unlocked, err := locker1.Unlock()
assert.NoError(t, err)
assert.True(t, unlocked)

locked1, err1 := locker.TryLock()
locker2 := GetLocker("test2")
assert.NoError(t, locker2.Lock())

locked1, err1 := locker2.TryLock()
assert.NoError(t, err1)
assert.False(t, locked1)

locker2.Unlock()

locked2, err2 := locker2.TryLock()
assert.NoError(t, err2)
assert.True(t, locked2)

locker2.Unlock()
}

func Test_Lock_Redis(t *testing.T) {
if os.Getenv("CI") == "" {
t.Skip("Skip test for local development")
}

lockService = newRedisLockService("redis://redis")

redisPool :=
locker1 := GetLocker("test1")
assert.NoError(t, locker1.Lock())
unlocked, err := locker1.Unlock()
assert.NoError(t, err)
assert.True(t, unlocked)

locker2 := GetLocker("test1")
assert.NoError(t, locker2.Lock())

locked1, err1 := locker2.TryLock()
assert.NoError(t, err1)
assert.True(t, locked1)
assert.False(t, locked1)

locker2.Unlock()

locked2, err2 := locker.TryLock()
locked2, err2 := locker2.TryLock()
assert.NoError(t, err2)
assert.False(t, locked2)
assert.True(t, locked2)

locker2.Unlock()

locker.Unlock()
redisPool.Close()
}
6 changes: 3 additions & 3 deletions services/pull/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,13 +335,13 @@ func handler(items ...string) []string {
}

func testPR(id int64) {
lock := globallock.GetLock(getPullWorkingLockKey(id))
if err := lock.Lock(); err != nil {
locker := globallock.GetLocker(getPullWorkingLockKey(id))
if err := locker.Lock(); err != nil {
log.Error("lock.Lock(): %v", err)
return
}
defer func() {
if _, err := lock.Unlock(); err != nil {
if _, err := locker.Unlock(); err != nil {
log.Error("lock.Unlock: %v", err)
}
}()
Expand Down
12 changes: 6 additions & 6 deletions services/pull/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,13 @@ func Merge(ctx context.Context, pr *issues_model.PullRequest, doer *user_model.U
return fmt.Errorf("unable to load head repo: %w", err)
}

lock := globallock.GetLock(getPullWorkingLockKey(pr.ID))
if err := lock.Lock(); err != nil {
locker := globallock.GetLocker(getPullWorkingLockKey(pr.ID))
if err := locker.Lock(); err != nil {
log.Error("lock.Lock(): %v", err)
return fmt.Errorf("lock.Lock: %w", err)
}
defer func() {
if _, err := lock.Unlock(); err != nil {
if _, err := locker.Unlock(); err != nil {
log.Error("lock.Unlock: %v", err)
}
}()
Expand Down Expand Up @@ -492,13 +492,13 @@ func CheckPullBranchProtections(ctx context.Context, pr *issues_model.PullReques

// MergedManually mark pr as merged manually
func MergedManually(ctx context.Context, pr *issues_model.PullRequest, doer *user_model.User, baseGitRepo *git.Repository, commitID string) error {
lock := globallock.GetLock(getPullWorkingLockKey(pr.ID))
if err := lock.Lock(); err != nil {
locker := globallock.GetLocker(getPullWorkingLockKey(pr.ID))
if err := locker.Lock(); err != nil {
log.Error("lock.Lock(): %v", err)
return fmt.Errorf("lock.Lock: %w", err)
}
defer func() {
if _, err := lock.Unlock(); err != nil {
if _, err := locker.Unlock(); err != nil {
log.Error("lock.Unlock: %v", err)
}
}()
Expand Down
6 changes: 3 additions & 3 deletions services/pull/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,13 +203,13 @@ func NewPullRequest(ctx context.Context, repo *repo_model.Repository, issue *iss

// ChangeTargetBranch changes the target branch of this pull request, as the given user.
func ChangeTargetBranch(ctx context.Context, pr *issues_model.PullRequest, doer *user_model.User, targetBranch string) (err error) {
lock := globallock.GetLock(getPullWorkingLockKey(pr.ID))
if err := lock.Lock(); err != nil {
locker := globallock.GetLocker(getPullWorkingLockKey(pr.ID))
if err := locker.Lock(); err != nil {
log.Error("lock.Lock(): %v", err)
return fmt.Errorf("lock.Lock: %w", err)
}
defer func() {
if _, err := lock.Unlock(); err != nil {
if _, err := locker.Unlock(); err != nil {
log.Error("lock.Unlock(): %v", err)
}
}()
Expand Down
6 changes: 3 additions & 3 deletions services/pull/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ func Update(ctx context.Context, pr *issues_model.PullRequest, doer *user_model.
return fmt.Errorf("update of agit flow pull request's head branch is unsupported")
}

lock := globallock.GetLock(getPullWorkingLockKey(pr.ID))
if err := lock.Lock(); err != nil {
locker := globallock.GetLocker(getPullWorkingLockKey(pr.ID))
if err := locker.Lock(); err != nil {
log.Error("lock.Lock(): %v", err)
return fmt.Errorf("lock.Lock: %w", err)
}
defer func() {
if _, err := lock.Unlock(); err != nil {
if _, err := locker.Unlock(); err != nil {
log.Error("lock.Unlock: %v", err)
}
}()
Expand Down
12 changes: 6 additions & 6 deletions services/repository/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ func TransferOwnership(ctx context.Context, doer, newOwner *user_model.User, rep

oldOwner := repo.Owner

lock := globallock.GetLock(getRepoWorkingLockKey(repo.ID))
if err := lock.Lock(); err != nil {
locker := globallock.GetLocker(getRepoWorkingLockKey(repo.ID))
if err := locker.Lock(); err != nil {
log.Error("lock.Lock(): %v", err)
return fmt.Errorf("lock.Lock: %w", err)
}
defer func() {
if _, err := lock.Unlock(); err != nil {
if _, err := locker.Unlock(); err != nil {
log.Error("lock.Unlock: %v", err)
}
}()
Expand Down Expand Up @@ -371,14 +371,14 @@ func ChangeRepositoryName(ctx context.Context, doer *user_model.User, repo *repo
// repo so that we can atomically rename the repo path and updates the
// local copy's origin accordingly.

lock := globallock.GetLock(getRepoWorkingLockKey(repo.ID))
if err := lock.Lock(); err != nil {
locker := globallock.GetLocker(getRepoWorkingLockKey(repo.ID))
if err := locker.Lock(); err != nil {
log.Error("lock.Lock(): %v", err)
return fmt.Errorf("lock.Lock: %w", err)
}

defer func() {
if _, err := lock.Unlock(); err != nil {
if _, err := locker.Unlock(); err != nil {
log.Error("lock.Unlock: %v", err)
}
}()
Expand Down
12 changes: 6 additions & 6 deletions services/wiki/wiki.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,12 @@ func updateWikiPage(ctx context.Context, doer *user_model.User, repo *repo_model
if err = validateWebPath(newWikiName); err != nil {
return err
}
lock := globallock.GetLock(getWikiWorkingLockKey(repo.ID))
if err := lock.Lock(); err != nil {
locker := globallock.GetLocker(getWikiWorkingLockKey(repo.ID))
if err := locker.Lock(); err != nil {
return err
}
defer func() {
if _, err := lock.Unlock(); err != nil {
if _, err := locker.Unlock(); err != nil {
log.Error("lock.Unlock: %v", err)
}
}()
Expand Down Expand Up @@ -258,12 +258,12 @@ func DeleteWikiPage(ctx context.Context, doer *user_model.User, repo *repo_model
return err
}

lock := globallock.GetLock(getWikiWorkingLockKey(repo.ID))
if err := lock.Lock(); err != nil {
locker := globallock.GetLocker(getWikiWorkingLockKey(repo.ID))
if err := locker.Lock(); err != nil {
return err
}
defer func() {
if _, err := lock.Unlock(); err != nil {
if _, err := locker.Unlock(); err != nil {
log.Error("lock.Unlock: %v", err)
}
}()
Expand Down