Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use global lock instead of NewExclusivePool to allow distributed lock between multiple Gitea instances #31813

Merged
merged 18 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions assets/go-licenses.json

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions custom/conf/app.example.ini
Original file line number Diff line number Diff line change
Expand Up @@ -2713,3 +2713,9 @@ LEVEL = Info
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; storage type
;STORAGE_TYPE = local

;[global_lock]
;; Lock service type, could be memory or redis
;SERVICE_TYPE = memory
;; Ignored for the "memory" type. For "redis" use something like `redis://127.0.0.1:6379/0`
;SERVICE_CONN_STR =
12 changes: 10 additions & 2 deletions modules/globallock/globallock.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,22 @@ package globallock
import (
"context"
"sync"

"code.gitea.io/gitea/modules/setting"
)

var (
defaultLocker Locker
initOnce sync.Once
initFunc = func() {
// TODO: read the setting and initialize the default locker.
// Before implementing this, don't use it.
switch setting.GlobalLock.ServiceType {
case "redis":
defaultLocker = NewRedisLocker(setting.GlobalLock.ServiceConnStr)
case "memory":
fallthrough
default:
defaultLocker = NewMemoryLocker()
}
} // define initFunc as a variable to make it possible to change it in tests
)

Expand Down
37 changes: 37 additions & 0 deletions modules/setting/gloabl_lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2024 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT

package setting

import (
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/nosql"
)

// GlobalLock represents configuration of global lock
var GlobalLock = struct {
ServiceType string
ServiceConnStr string
}{
ServiceType: "memory",
}

func loadGlobalLockFrom(rootCfg ConfigProvider) {
sec := rootCfg.Section("global_lock")
GlobalLock.ServiceType = sec.Key("SERVICE_TYPE").MustString("memory")
switch GlobalLock.ServiceType {
case "memory":
case "redis":
connStr := sec.Key("SERVICE_CONN_STR").String()
if connStr == "" {
log.Fatal("SERVICE_CONN_STR is empty for redis")
}
u := nosql.ToRedisURI(connStr)
if u == nil {
log.Fatal("SERVICE_CONN_STR %s is not a valid redis connection string", connStr)
}
GlobalLock.ServiceConnStr = connStr
default:
log.Fatal("Unknown sync lock service type: %s", GlobalLock.ServiceType)
}
}
35 changes: 35 additions & 0 deletions modules/setting/global_lock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2024 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT

package setting

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestLoadGlobalLockConfig(t *testing.T) {
t.Run("DefaultGlobalLockConfig", func(t *testing.T) {
iniStr := ``
cfg, err := NewConfigProviderFromData(iniStr)
assert.NoError(t, err)

loadGlobalLockFrom(cfg)
assert.EqualValues(t, "memory", GlobalLock.ServiceType)
})

t.Run("RedisGlobalLockConfig", func(t *testing.T) {
iniStr := `
[global_lock]
SERVICE_TYPE = redis
SERVICE_CONN_STR = addrs=127.0.0.1:6379 db=0
`
cfg, err := NewConfigProviderFromData(iniStr)
assert.NoError(t, err)

loadGlobalLockFrom(cfg)
assert.EqualValues(t, "redis", GlobalLock.ServiceType)
assert.EqualValues(t, "addrs=127.0.0.1:6379 db=0", GlobalLock.ServiceConnStr)
})
}
1 change: 1 addition & 0 deletions modules/setting/setting.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func loadCommonSettingsFrom(cfg ConfigProvider) error {
loadGitFrom(cfg)
loadMirrorFrom(cfg)
loadMarkupFrom(cfg)
loadGlobalLockFrom(cfg)
loadOtherFrom(cfg)
return nil
}
Expand Down
69 changes: 0 additions & 69 deletions modules/sync/exclusive_pool.go

This file was deleted.

13 changes: 10 additions & 3 deletions services/pull/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/git"
"code.gitea.io/gitea/modules/gitrepo"
"code.gitea.io/gitea/modules/globallock"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/process"
Expand Down Expand Up @@ -334,9 +335,15 @@ func handler(items ...string) []string {
}

func testPR(id int64) {
pullWorkingPool.CheckIn(fmt.Sprint(id))
defer pullWorkingPool.CheckOut(fmt.Sprint(id))
ctx, _, finished := process.GetManager().AddContext(graceful.GetManager().HammerContext(), fmt.Sprintf("Test PR[%d] from patch checking queue", id))
ctx := graceful.GetManager().HammerContext()
releaser, err := globallock.Lock(ctx, getPullWorkingLockKey(id))
if err != nil {
log.Error("lock.Lock(): %v", err)
return
}
defer releaser()

ctx, _, finished := process.GetManager().AddContext(ctx, fmt.Sprintf("Test PR[%d] from patch checking queue", id))
defer finished()

pr, err := issues_model.GetPullRequestByID(ctx, id)
Expand Down
25 changes: 18 additions & 7 deletions services/pull/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/cache"
"code.gitea.io/gitea/modules/git"
"code.gitea.io/gitea/modules/globallock"
"code.gitea.io/gitea/modules/httplib"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/references"
Expand Down Expand Up @@ -169,9 +170,6 @@ func Merge(ctx context.Context, pr *issues_model.PullRequest, doer *user_model.U
return fmt.Errorf("unable to load head repo: %w", err)
}

pullWorkingPool.CheckIn(fmt.Sprint(pr.ID))
defer pullWorkingPool.CheckOut(fmt.Sprint(pr.ID))

prUnit, err := pr.BaseRepo.GetUnit(ctx, unit.TypePullRequests)
if err != nil {
log.Error("pr.BaseRepo.GetUnit(unit.TypePullRequests): %v", err)
Expand All @@ -184,11 +182,18 @@ func Merge(ctx context.Context, pr *issues_model.PullRequest, doer *user_model.U
return models.ErrInvalidMergeStyle{ID: pr.BaseRepo.ID, Style: mergeStyle}
}

releaser, err := globallock.Lock(ctx, getPullWorkingLockKey(pr.ID))
if err != nil {
log.Error("lock.Lock(): %v", err)
return fmt.Errorf("lock.Lock: %w", err)
}
defer releaser()
defer func() {
go AddTestPullRequestTask(doer, pr.BaseRepo.ID, pr.BaseBranch, false, "", "")
}()

_, err = doMergeAndPush(ctx, pr, doer, mergeStyle, expectedHeadCommitID, message, repo_module.PushTriggerPRMergeToBase)
releaser()
if err != nil {
return err
}
Expand Down Expand Up @@ -487,10 +492,14 @@ func CheckPullBranchProtections(ctx context.Context, pr *issues_model.PullReques

// MergedManually mark pr as merged manually
func MergedManually(ctx context.Context, pr *issues_model.PullRequest, doer *user_model.User, baseGitRepo *git.Repository, commitID string) error {
pullWorkingPool.CheckIn(fmt.Sprint(pr.ID))
defer pullWorkingPool.CheckOut(fmt.Sprint(pr.ID))
releaser, err := globallock.Lock(ctx, getPullWorkingLockKey(pr.ID))
if err != nil {
log.Error("lock.Lock(): %v", err)
return fmt.Errorf("lock.Lock: %w", err)
}
wolfogre marked this conversation as resolved.
Show resolved Hide resolved
defer releaser()

if err := db.WithTx(ctx, func(ctx context.Context) error {
err = db.WithTx(ctx, func(ctx context.Context) error {
if err := pr.LoadBaseRepo(ctx); err != nil {
return err
}
Expand Down Expand Up @@ -540,7 +549,9 @@ func MergedManually(ctx context.Context, pr *issues_model.PullRequest, doer *use
return fmt.Errorf("SetMerged failed")
}
return nil
}); err != nil {
})
releaser()
if err != nil {
return err
}

Expand Down
15 changes: 10 additions & 5 deletions services/pull/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,21 @@ import (
"code.gitea.io/gitea/modules/container"
"code.gitea.io/gitea/modules/git"
"code.gitea.io/gitea/modules/gitrepo"
"code.gitea.io/gitea/modules/globallock"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/modules/log"
repo_module "code.gitea.io/gitea/modules/repository"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/sync"
"code.gitea.io/gitea/modules/util"
gitea_context "code.gitea.io/gitea/services/context"
issue_service "code.gitea.io/gitea/services/issue"
notify_service "code.gitea.io/gitea/services/notify"
)

// TODO: use clustered lock (unique queue? or *abuse* cache)
var pullWorkingPool = sync.NewExclusivePool()
func getPullWorkingLockKey(prID int64) string {
return fmt.Sprintf("pull_working_%d", prID)
}

// NewPullRequest creates new pull request with labels for repository.
func NewPullRequest(ctx context.Context, repo *repo_model.Repository, issue *issues_model.Issue, labelIDs []int64, uuids []string, pr *issues_model.PullRequest, assigneeIDs []int64) error {
Expand Down Expand Up @@ -202,8 +203,12 @@ func NewPullRequest(ctx context.Context, repo *repo_model.Repository, issue *iss

// ChangeTargetBranch changes the target branch of this pull request, as the given user.
func ChangeTargetBranch(ctx context.Context, pr *issues_model.PullRequest, doer *user_model.User, targetBranch string) (err error) {
pullWorkingPool.CheckIn(fmt.Sprint(pr.ID))
defer pullWorkingPool.CheckOut(fmt.Sprint(pr.ID))
releaser, err := globallock.Lock(ctx, getPullWorkingLockKey(pr.ID))
if err != nil {
log.Error("lock.Lock(): %v", err)
return fmt.Errorf("lock.Lock: %w", err)
}
defer releaser()

// Current target branch is already the same
if pr.BaseBranch == targetBranch {
Expand Down
9 changes: 7 additions & 2 deletions services/pull/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"code.gitea.io/gitea/models/unit"
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/git"
"code.gitea.io/gitea/modules/globallock"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/repository"
)
Expand All @@ -25,8 +26,12 @@ func Update(ctx context.Context, pr *issues_model.PullRequest, doer *user_model.
return fmt.Errorf("update of agit flow pull request's head branch is unsupported")
}

pullWorkingPool.CheckIn(fmt.Sprint(pr.ID))
defer pullWorkingPool.CheckOut(fmt.Sprint(pr.ID))
releaser, err := globallock.Lock(ctx, getPullWorkingLockKey(pr.ID))
if err != nil {
log.Error("lock.Lock(): %v", err)
return fmt.Errorf("lock.Lock: %w", err)
}
defer releaser()

diffCount, err := GetDiverging(ctx, pr)
if err != nil {
Expand Down
Loading