Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graceful: Xorm, RepoIndexer, Cron and Others #9282

Merged
merged 41 commits into from
Dec 15, 2019
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
5b3fa6d
Graceful: use GetManager instead of global
zeripath Dec 7, 2019
00ddf85
Graceful: Make repo indexer shutdown gracefully
zeripath Dec 7, 2019
b2dea35
Graceful: Make the cron tasks graceful
zeripath Nov 15, 2019
c0ea8ef
Graceful: Make TestPullRequests shutdownable
zeripath Nov 13, 2019
7beda2d
Graceful: AddTestPullRequest run in graceful ctx
zeripath Dec 7, 2019
db022dd
Graceful: SetDefaultContext for Xorm to be HammerContext
zeripath Dec 8, 2019
1aafb56
Avoid starting graceful for migrate commands and checkout
zeripath Dec 8, 2019
eae3144
Merge branch 'master' into graceful-more-things-graceful
zeripath Dec 10, 2019
7589a49
Graceful: DeliverHooks now can be shutdown
zeripath Dec 9, 2019
89154f5
Graceful: SyncMirrors shutdown
zeripath Dec 9, 2019
9b5d465
Merge branch 'master' into graceful-more-things-graceful
zeripath Dec 11, 2019
d7d6c86
Remove unnecessary ctx check
zeripath Dec 12, 2019
1a79b2f
Fix hammer syncing
zeripath Dec 12, 2019
7ce742c
rename manager.run to manager.start
zeripath Dec 12, 2019
fa751ea
adjust ctx.Done check
zeripath Dec 12, 2019
19f2ca7
Prevent deadlock in mirror.Update on shutdown
zeripath Dec 12, 2019
ef9db2d
Lint doesn't permit passing in ctx nil
zeripath Dec 12, 2019
90bde75
Add FIXME note to indexer/code/bleve.go
zeripath Dec 12, 2019
13a808d
Say CheckRepoStats: Aborting due to Shutdown instead
zeripath Dec 12, 2019
f1c85e0
Update modules/sync/unique_queue.go
zeripath Dec 12, 2019
aa2dabd
Merge branch 'graceful-more-things-graceful' of github.com:zeripath/g…
zeripath Dec 13, 2019
d55db9e
Make channels at start up rather than delayed
zeripath Dec 13, 2019
398ab3b
Merge branch 'master' into graceful-more-things-graceful
zeripath Dec 13, 2019
c93f1a7
Make repo indexer shutdown safely
zeripath Dec 13, 2019
4411cae
Update manager.go
zeripath Dec 13, 2019
5c2b081
Merge branch 'master' into graceful-more-things-graceful
zeripath Dec 14, 2019
9faf8da
Merge branch 'master' into graceful-more-things-graceful
lunny Dec 14, 2019
cb2ac9b
oops
zeripath Dec 14, 2019
607f2c2
Better error reporting
zeripath Dec 14, 2019
a4a722f
fixup
zeripath Dec 14, 2019
a9269a5
Push TestPullRequests to the UniqueQueue and make UniqueQueue closable
zeripath Dec 14, 2019
3b97005
Ensure webhook queue is also closed to prevent blockage on add here
zeripath Dec 14, 2019
c227a34
Remove unnecessary channel check
zeripath Dec 14, 2019
0e38ea8
Double check that we are not trying to add id to the table again
zeripath Dec 14, 2019
244b54a
Sort results of getunindexrepos by descending id
zeripath Dec 14, 2019
2098dd2
Ensure repos only added once to queue
zeripath Dec 14, 2019
1374ad2
Merge branch 'master' into graceful-more-things-graceful
zeripath Dec 14, 2019
d42cf0d
D'oh
zeripath Dec 15, 2019
be4388c
Skip duplicate exist check in addfunc
zeripath Dec 15, 2019
3e79dae
Merge branch 'master' into graceful-more-things-graceful
lunny Dec 15, 2019
caeecd2
Merge branch 'master' into graceful-more-things-graceful
zeripath Dec 15, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package cmd

import (
"context"

"code.gitea.io/gitea/models"
"code.gitea.io/gitea/models/migrations"
"code.gitea.io/gitea/modules/log"
Expand Down Expand Up @@ -32,7 +34,7 @@ func runMigrate(ctx *cli.Context) error {
log.Trace("Log path: %s", setting.LogRootPath)
setting.InitDBConfig()

if err := models.NewEngine(migrations.Migrate); err != nil {
if err := models.NewEngine(context.Background(), migrations.Migrate); err != nil {
log.Fatal("Failed to initialize ORM engine: %v", err)
return err
}
Expand Down
10 changes: 7 additions & 3 deletions cmd/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package cmd

import (
"context"
"fmt"
"net/http"
_ "net/http/pprof" // Used for debugging if enabled and a web server is running
Expand Down Expand Up @@ -96,6 +97,10 @@ func runLetsEncryptFallbackHandler(w http.ResponseWriter, r *http.Request) {
}

func runWeb(ctx *cli.Context) error {
managerCtx, cancel := context.WithCancel(context.Background())
graceful.InitManager(managerCtx)
defer cancel()

if os.Getppid() > 1 && len(os.Getenv("LISTEN_FDS")) > 0 {
log.Info("Restarting Gitea on PID: %d from parent PID: %d", os.Getpid(), os.Getppid())
} else {
Expand All @@ -108,7 +113,7 @@ func runWeb(ctx *cli.Context) error {
}

// Perform global initialization
routers.GlobalInit()
routers.GlobalInit(graceful.GetManager().HammerContext())

// Set up Macaron
m := routes.NewMacaron()
Expand Down Expand Up @@ -199,8 +204,7 @@ func runWeb(ctx *cli.Context) error {
log.Critical("Failed to start server: %v", err)
}
log.Info("HTTP Listener: %s Closed", listenAddr)
graceful.Manager.WaitForServers()
graceful.Manager.WaitForTerminate()
<-graceful.GetManager().Done()
log.Info("PID: %d Gitea Web Finished", os.Getpid())
log.Close()
return nil
Expand Down
4 changes: 2 additions & 2 deletions cmd/web_graceful.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ func runHTTPSWithTLSConfig(network, listenAddr string, tlsConfig *tls.Config, m

// NoHTTPRedirector tells our cleanup routine that we will not be using a fallback http redirector
func NoHTTPRedirector() {
graceful.Manager.InformCleanup()
graceful.GetManager().InformCleanup()
}

// NoMainListener tells our cleanup routine that we will not be using a possibly provided listener
// for our main HTTP/HTTPS service
func NoMainListener() {
graceful.Manager.InformCleanup()
graceful.GetManager().InformCleanup()
}

func runFCGI(network, listenAddr string, m http.Handler) error {
Expand Down
3 changes: 2 additions & 1 deletion contrib/pr/checkout.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ Checkout a PR and load the tests data into sqlite database
*/

import (
"context"
"flag"
"fmt"
"io/ioutil"
Expand Down Expand Up @@ -92,7 +93,7 @@ func runPR() {
//x, err = xorm.NewEngine("sqlite3", "file::memory:?cache=shared")

var helper testfixtures.Helper = &testfixtures.SQLite{}
models.NewEngine(func(_ *xorm.Engine) error {
models.NewEngine(context.Background(), func(_ *xorm.Engine) error {
return nil
})
models.HasEngine = true
Expand Down
6 changes: 4 additions & 2 deletions integrations/auth_ldap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package integrations

import (
"context"
"net/http"
"os"
"strings"
Expand Down Expand Up @@ -147,7 +148,7 @@ func TestLDAPUserSync(t *testing.T) {
}
defer prepareTestEnv(t)()
addAuthSourceLDAP(t, "")
models.SyncExternalUsers()
models.SyncExternalUsers(context.Background())

session := loginUser(t, "user1")
// Check if users exists
Expand Down Expand Up @@ -206,7 +207,8 @@ func TestLDAPUserSSHKeySync(t *testing.T) {
}
defer prepareTestEnv(t)()
addAuthSourceLDAP(t, "sshPublicKey")
models.SyncExternalUsers()

models.SyncExternalUsers(context.Background())

// Check if users has SSH keys synced
for _, u := range gitLDAPUsers {
Expand Down
8 changes: 7 additions & 1 deletion integrations/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package integrations

import (
"bytes"
"context"
"database/sql"
"encoding/json"
"fmt"
Expand All @@ -24,6 +25,7 @@ import (

"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/base"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/routers"
"code.gitea.io/gitea/routers/routes"
Expand Down Expand Up @@ -55,6 +57,10 @@ func NewNilResponseRecorder() *NilResponseRecorder {
}

func TestMain(m *testing.M) {
managerCtx, cancel := context.WithCancel(context.Background())
graceful.InitManager(managerCtx)
defer cancel()

initIntegrationTest()
mac = routes.NewMacaron()
routes.RegisterRoutes(mac)
Expand Down Expand Up @@ -171,7 +177,7 @@ func initIntegrationTest() {
}
defer db.Close()
}
routers.GlobalInit()
routers.GlobalInit(graceful.GetManager().HammerContext())
}

func prepareTestEnv(t testing.TB, skip ...int) func() {
Expand Down
3 changes: 2 additions & 1 deletion integrations/migration-test/migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package migrations

import (
"compress/gzip"
"context"
"database/sql"
"fmt"
"io/ioutil"
Expand Down Expand Up @@ -220,7 +221,7 @@ func doMigrationTest(t *testing.T, version string) {
err := models.SetEngine()
assert.NoError(t, err)

err = models.NewEngine(wrappedMigrate)
err = models.NewEngine(context.Background(), wrappedMigrate)
assert.NoError(t, err)
currentEngine.Close()
}
Expand Down
4 changes: 3 additions & 1 deletion models/branches.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package models

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -525,7 +526,8 @@ func (deletedBranch *DeletedBranch) LoadUser() {
}

// RemoveOldDeletedBranches removes old deleted branches
func RemoveOldDeletedBranches() {
func RemoveOldDeletedBranches(ctx context.Context) {
// Nothing to do for shutdown or terminate
log.Trace("Doing: DeletedBranchesCleanup")

deleteBefore := time.Now().Add(-setting.Cron.DeletedBranchesCleanup.OlderThan)
Expand Down
5 changes: 4 additions & 1 deletion models/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package models

import (
"context"
"database/sql"
"errors"
"fmt"
Expand Down Expand Up @@ -164,11 +165,13 @@ func SetEngine() (err error) {
}

// NewEngine initializes a new xorm.Engine
func NewEngine(migrateFunc func(*xorm.Engine) error) (err error) {
func NewEngine(ctx context.Context, migrateFunc func(*xorm.Engine) error) (err error) {
if err = SetEngine(); err != nil {
return err
}

x.SetDefaultContext(ctx)
zeripath marked this conversation as resolved.
Show resolved Hide resolved

if err = x.Ping(); err != nil {
return err
}
Expand Down
9 changes: 5 additions & 4 deletions models/pull_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,12 @@ func GetUnmergedPullRequestsByBaseInfo(repoID int64, branch string) ([]*PullRequ
Find(&prs)
}

// GetPullRequestsByCheckStatus returns all pull requests according the special checking status.
func GetPullRequestsByCheckStatus(status PullRequestStatus) ([]*PullRequest, error) {
prs := make([]*PullRequest, 0, 10)
return prs, x.
// GetPullRequestIDsByCheckStatus returns all pull requests according the special checking status.
func GetPullRequestIDsByCheckStatus(status PullRequestStatus) ([]int64, error) {
prs := make([]int64, 0, 10)
return prs, x.Table("pull_request").
Where("status=?", status).
Cols("pull_request.id").
Find(&prs)
}

Expand Down
64 changes: 56 additions & 8 deletions models/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package models

import (
"bytes"
"context"
"crypto/md5"
"errors"
"fmt"
Expand Down Expand Up @@ -2253,19 +2254,27 @@ func DeleteRepositoryArchives() error {
}

// DeleteOldRepositoryArchives deletes old repository archives.
func DeleteOldRepositoryArchives() {
func DeleteOldRepositoryArchives(ctx context.Context) {
log.Trace("Doing: ArchiveCleanup")

if err := x.Where("id > 0").Iterate(new(Repository), deleteOldRepositoryArchives); err != nil {
if err := x.Where("id > 0").Iterate(new(Repository), func(idx int, bean interface{}) error {
return deleteOldRepositoryArchives(ctx, idx, bean)
}); err != nil {
log.Error("ArchiveClean: %v", err)
}
}

func deleteOldRepositoryArchives(idx int, bean interface{}) error {
func deleteOldRepositoryArchives(ctx context.Context, idx int, bean interface{}) error {
repo := bean.(*Repository)
basePath := filepath.Join(repo.RepoPath(), "archives")

for _, ty := range []string{"zip", "targz"} {
select {
case <-ctx.Done():
return fmt.Errorf("Aborted due to shutdown:\nin delete of old repository archives %v\nat delete file %s", repo, ty)
default:
}

path := filepath.Join(basePath, ty)
file, err := os.Open(path)
if err != nil {
Expand All @@ -2288,6 +2297,11 @@ func deleteOldRepositoryArchives(idx int, bean interface{}) error {
minimumOldestTime := time.Now().Add(-setting.Cron.ArchiveCleanup.OlderThan)
for _, info := range files {
if info.ModTime().Before(minimumOldestTime) && !info.IsDir() {
select {
case <-ctx.Done():
return fmt.Errorf("Aborted due to shutdown:\nin delete of old repository archives %v\nat delete file %s - %s", repo, ty, info.Name())
default:
}
toDelete := filepath.Join(path, info.Name())
// This is a best-effort purge, so we do not check error codes to confirm removal.
if err = os.Remove(toDelete); err != nil {
Expand Down Expand Up @@ -2381,13 +2395,17 @@ func SyncRepositoryHooks() error {
}

// GitFsck calls 'git fsck' to check repository health.
func GitFsck() {
func GitFsck(ctx context.Context) {
log.Trace("Doing: GitFsck")

if err := x.
Where("id>0 AND is_fsck_enabled=?", true).BufferSize(setting.Database.IterateBufferSize).
Iterate(new(Repository),
func(idx int, bean interface{}) error {
select {
case <-ctx.Done():
return fmt.Errorf("Aborted due to shutdown")
default:
}
repo := bean.(*Repository)
repoPath := repo.RepoPath()
log.Trace("Running health check on repository %s", repoPath)
Expand Down Expand Up @@ -2433,13 +2451,19 @@ type repoChecker struct {
desc string
}

func repoStatsCheck(checker *repoChecker) {
func repoStatsCheck(ctx context.Context, checker *repoChecker) {
results, err := x.Query(checker.querySQL)
if err != nil {
log.Error("Select %s: %v", checker.desc, err)
return
}
for _, result := range results {
select {
case <-ctx.Done():
log.Warn("CheckRepoStats: Aborting due to shutdown")
return
default:
}
id := com.StrTo(result["id"]).MustInt64()
log.Trace("Updating %s: %d", checker.desc, id)
_, err = x.Exec(checker.correctSQL, id, id)
Expand All @@ -2450,7 +2474,7 @@ func repoStatsCheck(checker *repoChecker) {
}

// CheckRepoStats checks the repository stats
func CheckRepoStats() {
func CheckRepoStats(ctx context.Context) {
log.Trace("Doing: CheckRepoStats")

checkers := []*repoChecker{
Expand Down Expand Up @@ -2486,7 +2510,13 @@ func CheckRepoStats() {
},
}
for i := range checkers {
repoStatsCheck(checkers[i])
select {
case <-ctx.Done():
log.Warn("CheckRepoStats: Aborting due to shutdown")
return
default:
repoStatsCheck(ctx, checkers[i])
}
}

// ***** START: Repository.NumClosedIssues *****
Expand All @@ -2496,6 +2526,12 @@ func CheckRepoStats() {
log.Error("Select %s: %v", desc, err)
} else {
for _, result := range results {
select {
case <-ctx.Done():
log.Warn("CheckRepoStats: Aborting due to shutdown")
return
default:
}
id := com.StrTo(result["id"]).MustInt64()
log.Trace("Updating %s: %d", desc, id)
_, err = x.Exec("UPDATE `repository` SET num_closed_issues=(SELECT COUNT(*) FROM `issue` WHERE repo_id=? AND is_closed=? AND is_pull=?) WHERE id=?", id, true, false, id)
Expand All @@ -2513,6 +2549,12 @@ func CheckRepoStats() {
log.Error("Select %s: %v", desc, err)
} else {
for _, result := range results {
select {
case <-ctx.Done():
zeripath marked this conversation as resolved.
Show resolved Hide resolved
log.Warn("CheckRepoStats: Aborting due to shutdown")
return
default:
}
id := com.StrTo(result["id"]).MustInt64()
log.Trace("Updating %s: %d", desc, id)
_, err = x.Exec("UPDATE `repository` SET num_closed_pulls=(SELECT COUNT(*) FROM `issue` WHERE repo_id=? AND is_closed=? AND is_pull=?) WHERE id=?", id, true, true, id)
Expand All @@ -2530,6 +2572,12 @@ func CheckRepoStats() {
log.Error("Select repository count 'num_forks': %v", err)
} else {
for _, result := range results {
select {
case <-ctx.Done():
log.Warn("CheckRepoStats: Aborting due to shutdown")
return
default:
}
id := com.StrTo(result["id"]).MustInt64()
log.Trace("Updating repository count 'num_forks': %d", id)

Expand Down
Loading