From eb43091e0601fb7610a79423b3ba5866b15873c9 Mon Sep 17 00:00:00 2001 From: Zettat123 Date: Sat, 7 Dec 2024 20:34:56 +0800 Subject: [PATCH 1/2] support concurrency --- models/actions/run.go | 149 +++---- models/actions/run_job.go | 119 +++++- models/actions/run_job_list.go | 22 +- models/actions/run_list.go | 20 +- models/actions/task.go | 4 + models/migrations/v1_23/v312.go | 29 ++ routers/web/repo/actions/view.go | 66 ++- services/actions/concurrency.go | 76 ++++ services/actions/job_emitter.go | 164 ++++++- services/actions/job_emitter_test.go | 5 +- services/actions/notifier_helper.go | 33 +- services/actions/run.go | 138 ++++++ services/actions/schedule_tasks.go | 2 +- tests/integration/actions_concurrency_test.go | 403 ++++++++++++++++++ tests/integration/actions_runner_test.go | 17 +- 15 files changed, 1095 insertions(+), 152 deletions(-) create mode 100644 models/migrations/v1_23/v312.go create mode 100644 services/actions/concurrency.go create mode 100644 services/actions/run.go create mode 100644 tests/integration/actions_concurrency_test.go diff --git a/models/actions/run.go b/models/actions/run.go index 60fbbcd3233ef..4b31c0734ec8c 100644 --- a/models/actions/run.go +++ b/models/actions/run.go @@ -20,7 +20,6 @@ import ( "code.gitea.io/gitea/modules/util" webhook_module "code.gitea.io/gitea/modules/webhook" - "github.com/nektos/act/pkg/jobparser" "xorm.io/builder" ) @@ -47,6 +46,8 @@ type ActionRun struct { TriggerEvent string // the trigger event defined in the `on` configuration of the triggered workflow Status Status `xorm:"index"` Version int `xorm:"version default 0"` // Status could be updated concomitantly, so an optimistic lock is needed + ConcurrencyGroup string + ConcurrencyCancel bool // Started and Stopped is used for recording last run time, if rerun happened, they will be reset to 0 Started timeutil.TimeStamp Stopped timeutil.TimeStamp @@ -168,7 +169,7 @@ func (run *ActionRun) IsSchedule() bool { return run.ScheduleID > 0 } -func updateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) error { +func UpdateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) error { _, err := db.GetEngine(ctx).ID(repo.ID). SetExpr("num_action_runs", builder.Select("count(*)").From("action_run"). @@ -222,38 +223,8 @@ func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID strin return err } - // Iterate over each job and attempt to cancel it. - for _, job := range jobs { - // Skip jobs that are already in a terminal state (completed, cancelled, etc.). - status := job.Status - if status.IsDone() { - continue - } - - // If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it. - if job.TaskID == 0 { - job.Status = StatusCancelled - job.Stopped = timeutil.TimeStampNow() - - // Update the job's status and stopped time in the database. - n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped") - if err != nil { - return err - } - - // If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again. - if n == 0 { - return fmt.Errorf("job has changed, try again") - } - - // Continue with the next job. - continue - } - - // If the job has an associated task, try to stop the task, effectively cancelling the job. - if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil { - return err - } + if err := CancelJobs(ctx, jobs); err != nil { + return err } } @@ -261,80 +232,41 @@ func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID strin return nil } -// InsertRun inserts a run -// The title will be cut off at 255 characters if it's longer than 255 characters. -func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWorkflow) error { - ctx, committer, err := db.TxContext(ctx) - if err != nil { - return err - } - defer committer.Close() - - index, err := db.GetNextResourceIndex(ctx, "action_run_index", run.RepoID) - if err != nil { - return err - } - run.Index = index - run.Title = util.EllipsisDisplayString(run.Title, 255) +func CancelJobs(ctx context.Context, jobs []*ActionRunJob) error { + // Iterate over each job and attempt to cancel it. + for _, job := range jobs { + // Skip jobs that are already in a terminal state (completed, cancelled, etc.). + status := job.Status + if status.IsDone() { + continue + } - if err := db.Insert(ctx, run); err != nil { - return err - } + // If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it. + if job.TaskID == 0 { + job.Status = StatusCancelled + job.Stopped = timeutil.TimeStampNow() - if run.Repo == nil { - repo, err := repo_model.GetRepositoryByID(ctx, run.RepoID) - if err != nil { - return err - } - run.Repo = repo - } + // Update the job's status and stopped time in the database. + n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped") + if err != nil { + return err + } - if err := updateRepoRunsNumbers(ctx, run.Repo); err != nil { - return err - } + // If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again. + if n == 0 { + return fmt.Errorf("job has changed, try again") + } - runJobs := make([]*ActionRunJob, 0, len(jobs)) - var hasWaiting bool - for _, v := range jobs { - id, job := v.Job() - needs := job.Needs() - if err := v.SetJob(id, job.EraseNeeds()); err != nil { - return err + // Continue with the next job. + continue } - payload, _ := v.Marshal() - status := StatusWaiting - if len(needs) > 0 || run.NeedApproval { - status = StatusBlocked - } else { - hasWaiting = true - } - job.Name = util.EllipsisDisplayString(job.Name, 255) - runJobs = append(runJobs, &ActionRunJob{ - RunID: run.ID, - RepoID: run.RepoID, - OwnerID: run.OwnerID, - CommitSHA: run.CommitSHA, - IsForkPullRequest: run.IsForkPullRequest, - Name: job.Name, - WorkflowPayload: payload, - JobID: id, - Needs: needs, - RunsOn: job.RunsOn(), - Status: status, - }) - } - if err := db.Insert(ctx, runJobs); err != nil { - return err - } - // if there is a job in the waiting status, increase tasks version. - if hasWaiting { - if err := IncreaseTaskVersion(ctx, run.OwnerID, run.RepoID); err != nil { + // If the job has an associated task, try to stop the task, effectively cancelling the job. + if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil { return err } } - - return committer.Commit() + return nil } func GetRunByID(ctx context.Context, id int64) (*ActionRun, error) { @@ -426,7 +358,7 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error { } run.Repo = repo } - if err := updateRepoRunsNumbers(ctx, run.Repo); err != nil { + if err := UpdateRepoRunsNumbers(ctx, run.Repo); err != nil { return err } } @@ -435,3 +367,20 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error { } type ActionRunIndex db.ResourceIndex + +func ShouldBlockRunByConcurrency(ctx context.Context, actionRun *ActionRun) (bool, error) { + if actionRun.ConcurrencyGroup != "" || actionRun.ConcurrencyCancel { + return false, nil + } + + concurrentRunsNum, err := db.Count[ActionRun](ctx, &FindRunOptions{ + RepoID: actionRun.RepoID, + ConcurrencyGroup: actionRun.ConcurrencyGroup, + Status: []Status{StatusWaiting, StatusRunning}, + }) + if err != nil { + return false, fmt.Errorf("count running and waiting runs: %w", err) + } + + return concurrentRunsNum > 0, nil +} diff --git a/models/actions/run_job.go b/models/actions/run_job.go index de4b6aab66701..ccc1605deaf6b 100644 --- a/models/actions/run_job.go +++ b/models/actions/run_job.go @@ -33,10 +33,17 @@ type ActionRunJob struct { RunsOn []string `xorm:"JSON TEXT"` TaskID int64 // the latest task of the job Status Status `xorm:"index"` - Started timeutil.TimeStamp - Stopped timeutil.TimeStamp - Created timeutil.TimeStamp `xorm:"created"` - Updated timeutil.TimeStamp `xorm:"updated index"` + + RawConcurrencyGroup string // raw concurrency.group + RawConcurrencyCancel string // raw concurrency.cancel-in-progress + IsConcurrencyEvaluated bool // whether RawConcurrencyGroup have been evaluated, only valid when RawConcurrencyGroup is not empty + ConcurrencyGroup string // evaluated concurrency.group + ConcurrencyCancel bool // evaluated concurrency.cancel-in-progress + + Started timeutil.TimeStamp + Stopped timeutil.TimeStamp + Created timeutil.TimeStamp `xorm:"created"` + Updated timeutil.TimeStamp `xorm:"updated index"` } func init() { @@ -184,3 +191,107 @@ func AggregateJobStatus(jobs []*ActionRunJob) Status { return StatusUnknown // it shouldn't happen } } + +func ShouldBlockJobByConcurrency(ctx context.Context, job *ActionRunJob) (bool, error) { + if job.RawConcurrencyGroup != "" { + return false, nil + } + if !job.IsConcurrencyEvaluated { + return false, ErrUnevaluatedConcurrency{ + Group: job.RawConcurrencyGroup, + CancelInProgress: job.RawConcurrencyCancel, + } + } + if job.ConcurrencyGroup != "" || job.ConcurrencyCancel { + return false, nil + } + + concurrentJobsNum, err := db.Count[ActionRunJob](ctx, FindRunJobOptions{ + RepoID: job.RepoID, + ConcurrencyGroup: job.ConcurrencyGroup, + Statuses: []Status{StatusRunning, StatusWaiting}, + }) + if err != nil { + return false, fmt.Errorf("count running and waiting jobs: %w", err) + } + if concurrentJobsNum > 0 { + return true, nil + } + + if err := job.LoadRun(ctx); err != nil { + return false, fmt.Errorf("load run: %w", err) + } + + return ShouldBlockRunByConcurrency(ctx, job.Run) +} + +func CancelPreviousJobsByConcurrency(ctx context.Context, job *ActionRunJob) error { + if job.RawConcurrencyGroup != "" { + if !job.IsConcurrencyEvaluated { + return ErrUnevaluatedConcurrency{ + Group: job.RawConcurrencyGroup, + CancelInProgress: job.RawConcurrencyCancel, + } + } + if job.ConcurrencyGroup != "" && job.ConcurrencyCancel { + // cancel previous jobs in the same concurrency group + previousJobs, err := db.Find[ActionRunJob](ctx, &FindRunJobOptions{ + RepoID: job.RepoID, + ConcurrencyGroup: job.ConcurrencyGroup, + Statuses: []Status{StatusRunning, StatusWaiting, StatusBlocked}, + }) + if err != nil { + return fmt.Errorf("find previous jobs: %w", err) + } + previousJobs = slices.DeleteFunc(previousJobs, func(j *ActionRunJob) bool { return j.ID == job.ID }) + if err := CancelJobs(ctx, previousJobs); err != nil { + return fmt.Errorf("cancel previous jobs: %w", err) + } + } + } + + if err := job.LoadRun(ctx); err != nil { + return fmt.Errorf("load run: %w", err) + } + if job.Run.ConcurrencyGroup != "" && job.Run.ConcurrencyCancel { + // cancel previous runs in the same concurrency group + runs, err := db.Find[ActionRun](ctx, &FindRunOptions{ + RepoID: job.RepoID, + ConcurrencyGroup: job.Run.ConcurrencyGroup, + Status: []Status{StatusRunning, StatusWaiting, StatusBlocked}, + }) + if err != nil { + return fmt.Errorf("find runs: %w", err) + } + for _, run := range runs { + if run.ID == job.Run.ID { + continue + } + jobs, err := db.Find[ActionRunJob](ctx, FindRunJobOptions{ + RunID: run.ID, + }) + if err != nil { + return fmt.Errorf("find run %d jobs: %w", run.ID, err) + } + if err := CancelJobs(ctx, jobs); err != nil { + return fmt.Errorf("cancel run %d jobs: %w", run.ID, err) + } + } + } + + return nil +} + +type ErrUnevaluatedConcurrency struct { + Group string + CancelInProgress string +} + +func IsErrUnevaluatedConcurrency(err error) bool { + _, ok := err.(ErrUnevaluatedConcurrency) + return ok +} + +func (err ErrUnevaluatedConcurrency) Error() string { + return fmt.Sprintf("the raw concurrency [group=%s, cancel-in-progress=%s] has not been evaluated", err.Group, err.CancelInProgress) +} diff --git a/models/actions/run_job_list.go b/models/actions/run_job_list.go index 6c5d3b3252ebf..e1c2770c83594 100644 --- a/models/actions/run_job_list.go +++ b/models/actions/run_job_list.go @@ -46,14 +46,21 @@ func (jobs ActionJobList) LoadAttributes(ctx context.Context, withRepo bool) err return jobs.LoadRuns(ctx, withRepo) } +func GetRunsByIDs(ctx context.Context, runIDs []int64) (RunList, error) { + runList := make(RunList, 0, len(runIDs)) + err := db.GetEngine(ctx).In("id", runIDs).Find(&runList) + return runList, err +} + type FindRunJobOptions struct { db.ListOptions - RunID int64 - RepoID int64 - OwnerID int64 - CommitSHA string - Statuses []Status - UpdatedBefore timeutil.TimeStamp + RunID int64 + RepoID int64 + OwnerID int64 + CommitSHA string + Statuses []Status + UpdatedBefore timeutil.TimeStamp + ConcurrencyGroup string } func (opts FindRunJobOptions) ToConds() builder.Cond { @@ -76,5 +83,8 @@ func (opts FindRunJobOptions) ToConds() builder.Cond { if opts.UpdatedBefore > 0 { cond = cond.And(builder.Lt{"updated": opts.UpdatedBefore}) } + if opts.ConcurrencyGroup != "" { + cond = cond.And(builder.Eq{"concurrency_group": opts.ConcurrencyGroup}) + } return cond } diff --git a/models/actions/run_list.go b/models/actions/run_list.go index 4046c7d369436..2339e139147ca 100644 --- a/models/actions/run_list.go +++ b/models/actions/run_list.go @@ -63,14 +63,15 @@ func (runs RunList) LoadRepos(ctx context.Context) error { type FindRunOptions struct { db.ListOptions - RepoID int64 - OwnerID int64 - WorkflowID string - Ref string // the commit/tag/… that caused this workflow - TriggerUserID int64 - TriggerEvent webhook_module.HookEventType - Approved bool // not util.OptionalBool, it works only when it's true - Status []Status + RepoID int64 + OwnerID int64 + WorkflowID string + Ref string // the commit/tag/… that caused this workflow + TriggerUserID int64 + TriggerEvent webhook_module.HookEventType + Approved bool // not util.OptionalBool, it works only when it's true + Status []Status + ConcurrencyGroup string } func (opts FindRunOptions) ToConds() builder.Cond { @@ -99,6 +100,9 @@ func (opts FindRunOptions) ToConds() builder.Cond { if opts.TriggerEvent != "" { cond = cond.And(builder.Eq{"trigger_event": opts.TriggerEvent}) } + if len(opts.ConcurrencyGroup) > 0 { + cond = cond.And(builder.Eq{"concurrency_group": opts.ConcurrencyGroup}) + } return cond } diff --git a/models/actions/task.go b/models/actions/task.go index 9f13ff94c9e4a..daf78809d05d5 100644 --- a/models/actions/task.go +++ b/models/actions/task.go @@ -320,6 +320,10 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask return nil, false, nil } + if err := CancelPreviousJobsByConcurrency(ctx, job); err != nil { + return nil, false, err + } + task.Job = job if err := committer.Commit(); err != nil { diff --git a/models/migrations/v1_23/v312.go b/models/migrations/v1_23/v312.go new file mode 100644 index 0000000000000..9f9e9ed42b1aa --- /dev/null +++ b/models/migrations/v1_23/v312.go @@ -0,0 +1,29 @@ +// Copyright 2024 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package v1_23 //nolint + +import ( + "xorm.io/xorm" +) + +func AddActionsConcurrency(x *xorm.Engine) error { + type ActionRun struct { + ConcurrencyGroup string + ConcurrencyCancel bool + } + + if err := x.Sync(new(ActionRun)); err != nil { + return err + } + + type ActionRunJob struct { + RawConcurrencyGroup string + RawConcurrencyCancel string + IsConcurrencyEvaluated bool + ConcurrencyGroup string + ConcurrencyCancel bool + } + + return x.Sync(new(ActionRunJob)) +} diff --git a/routers/web/repo/actions/view.go b/routers/web/repo/actions/view.go index 9a18ca530582f..0f4ff7ed15e66 100644 --- a/routers/web/repo/actions/view.go +++ b/routers/web/repo/actions/view.go @@ -442,8 +442,35 @@ func rerunJob(ctx *context_module.Context, job *actions_model.ActionRunJob, shou job.Started = 0 job.Stopped = 0 + job.ConcurrencyGroup = "" + job.ConcurrencyCancel = false + job.IsConcurrencyEvaluated = false + if err := job.LoadRun(ctx); err != nil { + return err + } + vars, err := actions_model.GetVariablesOfRun(ctx, job.Run) + if err != nil { + return fmt.Errorf("get run %d variables: %w", job.Run.ID, err) + } + if job.RawConcurrencyGroup != "" && job.Status != actions_model.StatusBlocked { + var err error + job.ConcurrencyGroup, job.ConcurrencyCancel, err = actions_service.EvaluateJobConcurrency(job.Run, job, vars, nil) + if err != nil { + return fmt.Errorf("evaluate job concurrency: %w", err) + } + job.IsConcurrencyEvaluated = true + blockByConcurrency, err := actions_model.ShouldBlockJobByConcurrency(ctx, job) + if err != nil { + return err + } + if blockByConcurrency { + job.Status = actions_model.StatusBlocked + } + } + if err := db.WithTx(ctx, func(ctx context.Context) error { - _, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"status": status}, "task_id", "status", "started", "stopped") + updateCols := []string{"task_id", "status", "started", "stopped", "concurrency_group", "concurrency_cancel", "is_concurrency_evaluated"} + _, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"status": status}, updateCols...) return err }); err != nil { return err @@ -560,7 +587,14 @@ func Approve(ctx *context_module.Context) { return err } for _, job := range jobs { - if len(job.Needs) == 0 && job.Status.IsBlocked() { + blockJobByConcurrency, err := actions_model.ShouldBlockJobByConcurrency(ctx, job) + if err != nil { + if actions_model.IsErrUnevaluatedConcurrency(err) { + continue + } + return err + } + if len(job.Needs) == 0 && job.Status.IsBlocked() && !blockJobByConcurrency { job.Status = actions_model.StatusWaiting _, err := actions_model.UpdateRunJob(ctx, job, nil, "status") if err != nil { @@ -820,7 +854,10 @@ func Run(ctx *context_module.Context) { } // find workflow from commit - var workflows []*jobparser.SingleWorkflow + var ( + workflows []*jobparser.SingleWorkflow + wfRawConcurrency *model.RawConcurrency + ) for _, entry := range entries { if entry.Name() == workflowID { content, err := actions.GetContentFromEntry(entry) @@ -833,6 +870,11 @@ func Run(ctx *context_module.Context) { ctx.ServerError("workflow", err) return } + wfRawConcurrency, err = jobparser.ReadWorkflowRawConcurrency(content) + if err != nil { + ctx.ServerError("read workflow concurrency", err) + return + } break } } @@ -896,6 +938,22 @@ func Run(ctx *context_module.Context) { EventPayload: string(eventPayload), Status: actions_model.StatusWaiting, } + if wfRawConcurrency != nil { + vars, err := actions_model.GetVariablesOfRun(ctx, run) + if err != nil { + ctx.ServerError("GetVariablesOfRun", err) + return + } + wfConcurrencyGroup, wfConcurrencyCancel, err := actions_service.EvaluateWorkflowConcurrency(run, wfRawConcurrency, vars) + if err != nil { + ctx.ServerError("EvaluateWorkflowConcurrency", err) + return + } + if wfConcurrencyGroup != "" { + run.ConcurrencyGroup = wfConcurrencyGroup + run.ConcurrencyCancel = wfConcurrencyCancel + } + } // cancel running jobs of the same workflow if err := actions_model.CancelPreviousJobs( @@ -909,7 +967,7 @@ func Run(ctx *context_module.Context) { } // Insert the action run and its associated jobs into the database - if err := actions_model.InsertRun(ctx, run, workflows); err != nil { + if err := actions_service.InsertRun(ctx, run, workflows); err != nil { ctx.ServerError("workflow", err) return } diff --git a/services/actions/concurrency.go b/services/actions/concurrency.go new file mode 100644 index 0000000000000..dacb44ae46385 --- /dev/null +++ b/services/actions/concurrency.go @@ -0,0 +1,76 @@ +// Copyright 2024 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "fmt" + + actions_model "code.gitea.io/gitea/models/actions" + "code.gitea.io/gitea/modules/json" + api "code.gitea.io/gitea/modules/structs" + + "github.com/nektos/act/pkg/jobparser" + act_model "github.com/nektos/act/pkg/model" +) + +func EvaluateWorkflowConcurrency(run *actions_model.ActionRun, rc *act_model.RawConcurrency, vars map[string]string) (string, bool, error) { + gitCtx := jobparser.ToGitContext(GenerateGiteaContext(run, nil)) + jobResults := map[string]*jobparser.JobResult{"": {}} + inputs, err := getInputsFromRun(run) + if err != nil { + return "", false, fmt.Errorf("get inputs: %w", err) + } + + concurrencyGroup, concurrencyCancel, err := jobparser.EvaluateConcurrency(rc, "", nil, gitCtx, jobResults, vars, inputs) + if err != nil { + return "", false, fmt.Errorf("evaluate concurrency: %w", err) + } + + return concurrencyGroup, concurrencyCancel, nil +} + +func EvaluateJobConcurrency(run *actions_model.ActionRun, actionRunJob *actions_model.ActionRunJob, vars map[string]string, jobResults map[string]*jobparser.JobResult) (string, bool, error) { + rawConcurrency := &act_model.RawConcurrency{ + Group: actionRunJob.RawConcurrencyGroup, + CancelInProgress: actionRunJob.RawConcurrencyCancel, + } + + singleWorkflows, err := jobparser.Parse([]byte(actionRunJob.WorkflowPayload)) + if err != nil { + return "", false, fmt.Errorf("parse single workflow: %w", err) + } else if len(singleWorkflows) != 1 { + return "", false, fmt.Errorf("not single workflow") + } + _, singleWorkflowJob := singleWorkflows[0].Job() + + gitCtx := jobparser.ToGitContext(GenerateGiteaContext(run, actionRunJob)) + if jobResults == nil { + jobResults = map[string]*jobparser.JobResult{} + } + jobResults[actionRunJob.JobID] = &jobparser.JobResult{ + Needs: actionRunJob.Needs, + } + inputs, err := getInputsFromRun(run) + if err != nil { + return "", false, fmt.Errorf("get inputs: %w", err) + } + + concurrencyGroup, concurrencyCancel, err := jobparser.EvaluateConcurrency(rawConcurrency, actionRunJob.JobID, singleWorkflowJob, gitCtx, jobResults, vars, inputs) + if err != nil { + return "", false, fmt.Errorf("evaluate concurrency: %w", err) + } + + return concurrencyGroup, concurrencyCancel, nil +} + +func getInputsFromRun(run *actions_model.ActionRun) (map[string]any, error) { + if run.Event != "workflow_dispatch" { + return map[string]any{}, nil + } + var payload api.WorkflowDispatchPayload + if err := json.Unmarshal([]byte(run.EventPayload), &payload); err != nil { + return nil, err + } + return payload.Inputs, nil +} diff --git a/services/actions/job_emitter.go b/services/actions/job_emitter.go index 1f859fcf70506..01c3d2bcf21eb 100644 --- a/services/actions/job_emitter.go +++ b/services/actions/job_emitter.go @@ -10,7 +10,9 @@ import ( actions_model "code.gitea.io/gitea/models/actions" "code.gitea.io/gitea/models/db" + "code.gitea.io/gitea/modules/container" "code.gitea.io/gitea/modules/graceful" + "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/queue" "github.com/nektos/act/pkg/jobparser" @@ -37,25 +39,117 @@ func jobEmitterQueueHandler(items ...*jobUpdate) []*jobUpdate { ctx := graceful.GetManager().ShutdownContext() var ret []*jobUpdate for _, update := range items { - if err := checkJobsOfRun(ctx, update.RunID); err != nil { + if err := checkJobsByRunID(ctx, update.RunID); err != nil { + log.Error("check run %d: %v", update.RunID, err) ret = append(ret, update) } } return ret } -func checkJobsOfRun(ctx context.Context, runID int64) error { - jobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: runID}) +func checkJobsByRunID(ctx context.Context, runID int64) error { + run, err := actions_model.GetRunByID(ctx, runID) + if err != nil { + return fmt.Errorf("get action run: %w", err) + } + + return db.WithTx(ctx, func(ctx context.Context) error { + // check jobs of the current run + if err := checkJobsOfRun(ctx, run); err != nil { + return err + } + + // check run (workflow-level) concurrency + concurrentRunIDs := make(container.Set[int64]) + if run.ConcurrencyGroup != "" { + concurrentRuns, err := db.Find[actions_model.ActionRun](ctx, actions_model.FindRunOptions{ + RepoID: run.RepoID, + ConcurrencyGroup: run.ConcurrencyGroup, + Status: []actions_model.Status{actions_model.StatusBlocked}, + }) + if err != nil { + return err + } + for _, concurrentRun := range concurrentRuns { + concurrentRunIDs.Add(concurrentRun.ID) + if concurrentRun.NeedApproval { + continue + } + if err := checkJobsOfRun(ctx, concurrentRun); err != nil { + return err + } + updatedRun, err := actions_model.GetRunByID(ctx, concurrentRun.ID) + if err != nil { + return err + } + if updatedRun.Status == actions_model.StatusWaiting { + // only run one blocked action run in the same concurrency group + break + } + } + } + + // check job concurrency + concurrentJobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: run.ID}) + if err != nil { + return err + } + for _, job := range concurrentJobs { + if job.Status.IsDone() && job.ConcurrencyGroup != "" { + concurrentJobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{ + RepoID: job.RepoID, + ConcurrencyGroup: job.ConcurrencyGroup, + Statuses: []actions_model.Status{actions_model.StatusBlocked}, + }) + if err != nil { + return err + } + for _, concurrentJob := range concurrentJobs { + if concurrentRunIDs.Contains(concurrentJob.RunID) { + continue + } + concurrentRun, err := actions_model.GetRunByID(ctx, concurrentJob.RunID) + if err != nil { + return err + } + if concurrentRun.NeedApproval { + continue + } + if err := checkJobsOfRun(ctx, concurrentRun); err != nil { + return err + } + updatedJob, err := actions_model.GetRunJobByID(ctx, concurrentJob.ID) + if err != nil { + return err + } + if updatedJob.Status == actions_model.StatusWaiting { + break + } + } + } + } + + return nil + }) +} + +func checkJobsOfRun(ctx context.Context, run *actions_model.ActionRun) error { + jobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: run.ID}) if err != nil { return err } + + vars, err := actions_model.GetVariablesOfRun(ctx, run) + if err != nil { + return fmt.Errorf("get run %d variables: %w", run.ID, err) + } + if err := db.WithTx(ctx, func(ctx context.Context) error { - idToJobs := make(map[string][]*actions_model.ActionRunJob, len(jobs)) for _, job := range jobs { - idToJobs[job.JobID] = append(idToJobs[job.JobID], job) + job.Run = run } - updates := newJobStatusResolver(jobs).Resolve() + updates := newJobStatusResolver(jobs, vars).Resolve(ctx) for _, job := range jobs { if status, ok := updates[job.ID]; ok { job.Status = status @@ -78,9 +172,10 @@ type jobStatusResolver struct { statuses map[int64]actions_model.Status needs map[int64][]int64 jobMap map[int64]*actions_model.ActionRunJob + vars map[string]string } -func newJobStatusResolver(jobs actions_model.ActionJobList) *jobStatusResolver { +func newJobStatusResolver(jobs actions_model.ActionJobList, vars map[string]string) *jobStatusResolver { idToJobs := make(map[string][]*actions_model.ActionRunJob, len(jobs)) jobMap := make(map[int64]*actions_model.ActionRunJob) for _, job := range jobs { @@ -102,13 +197,14 @@ func newJobStatusResolver(jobs actions_model.ActionJobList) *jobStatusResolver { statuses: statuses, needs: needs, jobMap: jobMap, + vars: vars, } } -func (r *jobStatusResolver) Resolve() map[int64]actions_model.Status { +func (r *jobStatusResolver) Resolve(ctx context.Context) map[int64]actions_model.Status { ret := map[int64]actions_model.Status{} for i := 0; i < len(r.statuses); i++ { - updated := r.resolve() + updated := r.resolve(ctx) if len(updated) == 0 { return ret } @@ -120,7 +216,7 @@ func (r *jobStatusResolver) Resolve() map[int64]actions_model.Status { return ret } -func (r *jobStatusResolver) resolve() map[int64]actions_model.Status { +func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model.Status { ret := map[int64]actions_model.Status{} for id, status := range r.statuses { if status != actions_model.StatusBlocked { @@ -137,6 +233,17 @@ func (r *jobStatusResolver) resolve() map[int64]actions_model.Status { } } if allDone { + // check concurrency + blockedByJobConcurrency, err := checkConcurrencyForJobWithNeeds(ctx, r.jobMap[id], r.vars) + if err != nil { + log.Error("Check job %d concurrency: %v. This job will stay blocked.", id, err) + continue + } + + if blockedByJobConcurrency { + continue + } + if allSucceed { ret[id] = actions_model.StatusWaiting } else { @@ -160,3 +267,40 @@ func (r *jobStatusResolver) resolve() map[int64]actions_model.Status { } return ret } + +func checkConcurrencyForJobWithNeeds(ctx context.Context, actionRunJob *actions_model.ActionRunJob, vars map[string]string) (bool, error) { + if len(actionRunJob.RawConcurrencyGroup) == 0 { + return false, nil + } + if err := actionRunJob.LoadAttributes(ctx); err != nil { + return false, err + } + + if !actionRunJob.IsConcurrencyEvaluated { + // empty concurrency group means the raw concurrency has not been evaluated + taskNeeds, err := FindTaskNeeds(ctx, actionRunJob) + if err != nil { + return false, fmt.Errorf("find task needs: %w", err) + } + jobResults := make(map[string]*jobparser.JobResult, len(taskNeeds)) + for jobID, taskNeed := range taskNeeds { + jobResult := &jobparser.JobResult{ + Result: taskNeed.Result.String(), + Outputs: taskNeed.Outputs, + } + jobResults[jobID] = jobResult + } + + actionRunJob.ConcurrencyGroup, actionRunJob.ConcurrencyCancel, err = EvaluateJobConcurrency(actionRunJob.Run, actionRunJob, vars, jobResults) + if err != nil { + return false, fmt.Errorf("evaluate job concurrency: %w", err) + } + actionRunJob.IsConcurrencyEvaluated = true + + if _, err := actions_model.UpdateRunJob(ctx, actionRunJob, nil, "concurrency_group", "concurrency_cancel", "is_concurrency_evaluated"); err != nil { + return false, fmt.Errorf("update run job: %w", err) + } + } + + return actions_model.ShouldBlockJobByConcurrency(ctx, actionRunJob) +} diff --git a/services/actions/job_emitter_test.go b/services/actions/job_emitter_test.go index 58c2dc3b242bb..5fe9c59dc32d9 100644 --- a/services/actions/job_emitter_test.go +++ b/services/actions/job_emitter_test.go @@ -4,6 +4,7 @@ package actions import ( + "context" "testing" actions_model "code.gitea.io/gitea/models/actions" @@ -129,8 +130,8 @@ jobs: } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - r := newJobStatusResolver(tt.jobs) - assert.Equal(t, tt.want, r.Resolve()) + r := newJobStatusResolver(tt.jobs, nil) + assert.Equal(t, tt.want, r.Resolve(context.Background())) }) } } diff --git a/services/actions/notifier_helper.go b/services/actions/notifier_helper.go index 323c6a76e422c..0d7d2f3b04dc7 100644 --- a/services/actions/notifier_helper.go +++ b/services/actions/notifier_helper.go @@ -332,27 +332,30 @@ func handleWorkflows( continue } - jobs, err := jobparser.Parse(dwf.Content, jobparser.WithVars(vars)) + wfRawConcurrency, err := jobparser.ReadWorkflowRawConcurrency(dwf.Content) if err != nil { - log.Error("jobparser.Parse: %v", err) + log.Error("ReadWorkflowRawConcurrency: %v", err) continue } - - // cancel running jobs if the event is push or pull_request_sync - if run.Event == webhook_module.HookEventPush || - run.Event == webhook_module.HookEventPullRequestSync { - if err := actions_model.CancelPreviousJobs( - ctx, - run.RepoID, - run.Ref, - run.WorkflowID, - run.Event, - ); err != nil { - log.Error("CancelPreviousJobs: %v", err) + if wfRawConcurrency != nil { + wfConcurrencyGroup, wfConcurrencyCancel, err := EvaluateWorkflowConcurrency(run, wfRawConcurrency, vars) + if err != nil { + log.Error("EvaluateWorkflowConcurrency: %v", err) + continue } + if wfConcurrencyGroup != "" { + run.ConcurrencyGroup = wfConcurrencyGroup + run.ConcurrencyCancel = wfConcurrencyCancel + } + } + + jobs, err := jobparser.Parse(dwf.Content, jobparser.WithVars(vars)) + if err != nil { + log.Error("jobparser.Parse: %v", err) + continue } - if err := actions_model.InsertRun(ctx, run, jobs); err != nil { + if err := InsertRun(ctx, run, jobs); err != nil { log.Error("InsertRun: %v", err) continue } diff --git a/services/actions/run.go b/services/actions/run.go new file mode 100644 index 0000000000000..95e078f90a60a --- /dev/null +++ b/services/actions/run.go @@ -0,0 +1,138 @@ +// Copyright 2024 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package actions + +import ( + "context" + "fmt" + + actions_model "code.gitea.io/gitea/models/actions" + "code.gitea.io/gitea/models/db" + repo_model "code.gitea.io/gitea/models/repo" + "code.gitea.io/gitea/modules/util" + + "github.com/nektos/act/pkg/jobparser" +) + +// InsertRun inserts a run +// The title will be cut off at 255 characters if it's longer than 255 characters. +func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobparser.SingleWorkflow) error { + ctx, committer, err := db.TxContext(ctx) + if err != nil { + return err + } + defer committer.Close() + + index, err := db.GetNextResourceIndex(ctx, "action_run_index", run.RepoID) + if err != nil { + return err + } + run.Index = index + run.Title = util.EllipsisDisplayString(run.Title, 255) + + // check run (workflow-level) concurrency + blockRunByConcurrency, err := actions_model.ShouldBlockRunByConcurrency(ctx, run) + if err != nil { + return err + } + if blockRunByConcurrency { + run.Status = actions_model.StatusBlocked + } + + if err := db.Insert(ctx, run); err != nil { + return err + } + + if run.Repo == nil { + repo, err := repo_model.GetRepositoryByID(ctx, run.RepoID) + if err != nil { + return err + } + run.Repo = repo + } + + if err := actions_model.UpdateRepoRunsNumbers(ctx, run.Repo); err != nil { + return err + } + + // query vars for evaluating job concurrency groups + vars, err := actions_model.GetVariablesOfRun(ctx, run) + if err != nil { + return fmt.Errorf("get run %d variables: %w", run.ID, err) + } + + runJobs := make([]*actions_model.ActionRunJob, 0, len(jobs)) + var hasWaiting bool + for _, v := range jobs { + id, job := v.Job() + needs := job.Needs() + if err := v.SetJob(id, job.EraseNeeds()); err != nil { + return err + } + payload, _ := v.Marshal() + status := actions_model.StatusWaiting + if len(needs) > 0 || run.NeedApproval || run.Status == actions_model.StatusBlocked { + status = actions_model.StatusBlocked + } else { + hasWaiting = true + } + job.Name = util.EllipsisDisplayString(job.Name, 255) + runJob := &actions_model.ActionRunJob{ + RunID: run.ID, + RepoID: run.RepoID, + OwnerID: run.OwnerID, + CommitSHA: run.CommitSHA, + IsForkPullRequest: run.IsForkPullRequest, + Name: job.Name, + WorkflowPayload: payload, + JobID: id, + Needs: needs, + RunsOn: job.RunsOn(), + Status: status, + } + + // check job concurrency + if job.RawConcurrency != nil && job.RawConcurrency.Group != "" { + runJob.RawConcurrencyGroup = job.RawConcurrency.Group + runJob.RawConcurrencyCancel = job.RawConcurrency.CancelInProgress + // we do not need to evaluate job concurrency if the job is blocked because it will be checked by job emitter + if runJob.Status != actions_model.StatusBlocked { + var err error + runJob.ConcurrencyGroup, runJob.ConcurrencyCancel, err = EvaluateJobConcurrency(run, runJob, vars, nil) + if err != nil { + return fmt.Errorf("evaluate job concurrency: %w", err) + } + runJob.IsConcurrencyEvaluated = true + // check if the job should be blocked by job concurrency + blockByConcurrency, err := actions_model.ShouldBlockJobByConcurrency(ctx, runJob) + if err != nil { + return err + } + if blockByConcurrency { + runJob.Status = actions_model.StatusBlocked + } + } + } + + runJobs = append(runJobs, runJob) + } + + if err := db.Insert(ctx, runJobs); err != nil { + return err + } + + run.Status = actions_model.AggregateJobStatus(runJobs) + if err := actions_model.UpdateRun(ctx, run, "status"); err != nil { + return err + } + + // if there is a job in the waiting status, increase tasks version. + if hasWaiting { + if err := actions_model.IncreaseTaskVersion(ctx, run.OwnerID, run.RepoID); err != nil { + return err + } + } + + return committer.Commit() +} diff --git a/services/actions/schedule_tasks.go b/services/actions/schedule_tasks.go index 18f3324fd2c26..4f6cc47036ff1 100644 --- a/services/actions/schedule_tasks.go +++ b/services/actions/schedule_tasks.go @@ -145,7 +145,7 @@ func CreateScheduleTask(ctx context.Context, cron *actions_model.ActionSchedule) } // Insert the action run and its associated jobs into the database - if err := actions_model.InsertRun(ctx, run, workflows); err != nil { + if err := InsertRun(ctx, run, workflows); err != nil { return err } diff --git a/tests/integration/actions_concurrency_test.go b/tests/integration/actions_concurrency_test.go new file mode 100644 index 0000000000000..43a8295c7b8c8 --- /dev/null +++ b/tests/integration/actions_concurrency_test.go @@ -0,0 +1,403 @@ +package integration + +import ( + "encoding/base64" + "fmt" + "net/http" + "net/url" + "slices" + "testing" + "time" + + runnerv1 "code.gitea.io/actions-proto-go/runner/v1" + actions_model "code.gitea.io/gitea/models/actions" + auth_model "code.gitea.io/gitea/models/auth" + repo_model "code.gitea.io/gitea/models/repo" + "code.gitea.io/gitea/models/unittest" + user_model "code.gitea.io/gitea/models/user" + api "code.gitea.io/gitea/modules/structs" + "code.gitea.io/gitea/modules/util" + + "github.com/stretchr/testify/assert" +) + +func TestWorkflowConcurrency(t *testing.T) { + onGiteaRun(t, func(t *testing.T, u *url.URL) { + user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) + session := loginUser(t, user2.Name) + token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteRepository, auth_model.AccessTokenScopeWriteUser) + + apiRepo := createActionsTestRepo(t, token, "actions-concurrency", false) + repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: apiRepo.ID}) + runner := newMockRunner() + runner.registerAsRepoRunner(t, user2.Name, repo.Name, "mock-runner", []string{"ubuntu-latest"}) + + // add a variable for test + req := NewRequestWithJSON(t, "POST", + fmt.Sprintf("/api/v1/repos/%s/%s/actions/variables/myvar", user2.Name, repo.Name), &api.CreateVariableOption{ + Value: "abc123", + }). + AddTokenAuth(token) + MakeRequest(t, req, http.StatusNoContent) + + wf1TreePath := ".gitea/workflows/concurrent-workflow-1.yml" + wf1FileContent := `name: concurrent-workflow-1 +on: + push: + paths: + - '.gitea/workflows/concurrent-workflow-1.yml' +concurrency: + group: workflow-main-abc123 +jobs: + wf1-job: + runs-on: ubuntu-latest + steps: + - run: echo 'job from workflow1' +` + wf2TreePath := ".gitea/workflows/concurrent-workflow-2.yml" + wf2FileContent := `name: concurrent-workflow-2 +on: + push: + paths: + - '.gitea/workflows/concurrent-workflow-2.yml' +concurrency: + group: workflow-${{ github.ref_name }}-${{ vars.myvar }} +jobs: + wf2-job: + runs-on: ubuntu-latest + steps: + - run: echo 'job from workflow2' +` + wf3TreePath := ".gitea/workflows/concurrent-workflow-3.yml" + wf3FileContent := `name: concurrent-workflow-3 +on: + push: + paths: + - '.gitea/workflows/concurrent-workflow-3.yml' +concurrency: + group: workflow-main-abc${{ 123 }} +jobs: + wf3-job: + runs-on: ubuntu-latest + steps: + - run: echo 'job from workflow3' +` + opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, fmt.Sprintf("create %s", wf1TreePath), wf1FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf1TreePath, opts1) + opts2 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, fmt.Sprintf("create %s", wf2TreePath), wf2FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf2TreePath, opts2) + opts3 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, fmt.Sprintf("create %s", wf3TreePath), wf3FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf3TreePath, opts3) + + // fetch and exec workflow1, workflow2 and workflow3 are blocked + task := runner.fetchTask(t) + _, _, run := getTaskAndJobAndRunByTaskID(t, task.Id) + assert.Equal(t, "workflow-main-abc123", run.ConcurrencyGroup) + assert.Equal(t, "concurrent-workflow-1.yml", run.WorkflowID) + runner.fetchNoTask(t) + runner.execTask(t, task, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + }) + + // fetch workflow2 or workflow3 + workflowNames := []string{"concurrent-workflow-2.yml", "concurrent-workflow-3.yml"} + task = runner.fetchTask(t) + _, _, run = getTaskAndJobAndRunByTaskID(t, task.Id) + assert.Contains(t, workflowNames, run.WorkflowID) + workflowNames = slices.DeleteFunc(workflowNames, func(wfn string) bool { return wfn == run.WorkflowID }) + assert.Equal(t, "workflow-main-abc123", run.ConcurrencyGroup) + runner.fetchNoTask(t) + runner.execTask(t, task, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + }) + + // fetch the last workflow (workflow2 or workflow3) + task = runner.fetchTask(t) + _, _, run = getTaskAndJobAndRunByTaskID(t, task.Id) + assert.Equal(t, "workflow-main-abc123", run.ConcurrencyGroup) + assert.Equal(t, workflowNames[0], run.WorkflowID) + runner.fetchNoTask(t) + runner.execTask(t, task, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + }) + + httpContext := NewAPITestContext(t, user2.Name, repo.Name, auth_model.AccessTokenScopeWriteRepository) + doAPIDeleteRepository(httpContext)(t) + }) +} + +func TestWorkflowConcurrency_WithPullRequest(t *testing.T) { + onGiteaRun(t, func(t *testing.T, u *url.URL) { + // user2 is the owner of the base repo + user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) + user2Session := loginUser(t, user2.Name) + user2Token := getTokenForLoggedInUser(t, user2Session, auth_model.AccessTokenScopeWriteRepository, auth_model.AccessTokenScopeWriteUser) + // user4 is the owner of the forked repo + user4 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 4}) + user4Token := getTokenForLoggedInUser(t, loginUser(t, user4.Name), auth_model.AccessTokenScopeWriteRepository, auth_model.AccessTokenScopeWriteUser) + + apiBaseRepo := createActionsTestRepo(t, user2Token, "actions-concurrency", false) + baseRepo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: apiBaseRepo.ID}) + user2APICtx := NewAPITestContext(t, baseRepo.OwnerName, baseRepo.Name, auth_model.AccessTokenScopeWriteRepository) + + runner := newMockRunner() + runner.registerAsRepoRunner(t, baseRepo.OwnerName, baseRepo.Name, "mock-runner", []string{"ubuntu-latest"}) + + // init the workflow + wfTreePath := ".gitea/workflows/pull.yml" + wfFileContent := `name: Pull Request +on: pull_request +concurrency: + group: pull-request-test + cancel-in-progress: ${{ !startsWith(github.head_ref, 'do-not-cancel/') }} +jobs: + wf1-job: + runs-on: ubuntu-latest + steps: + - run: echo 'test the pull' +` + opts1 := getWorkflowCreateFileOptions(user2, baseRepo.DefaultBranch, fmt.Sprintf("create %s", wfTreePath), wfFileContent) + createWorkflowFile(t, user2Token, baseRepo.OwnerName, baseRepo.Name, wfTreePath, opts1) + // user2 creates a pull request + doAPICreateFile(user2APICtx, "user2-fix.txt", &api.CreateFileOptions{ + FileOptions: api.FileOptions{ + NewBranchName: "bugfix/aaa", + Message: "create user2-fix.txt", + Author: api.Identity{ + Name: user4.Name, + Email: user4.Email, + }, + Committer: api.Identity{ + Name: user4.Name, + Email: user4.Email, + }, + Dates: api.CommitDateOptions{ + Author: time.Now(), + Committer: time.Now(), + }, + }, + ContentBase64: base64.StdEncoding.EncodeToString([]byte("user2-fix")), + })(t) + doAPICreatePullRequest(user2APICtx, baseRepo.OwnerName, baseRepo.Name, baseRepo.DefaultBranch, "bugfix/aaa")(t) + pr1Task1 := runner.fetchTask(t) + _, _, pr1Run1 := getTaskAndJobAndRunByTaskID(t, pr1Task1.Id) + assert.Equal(t, "pull-request-test", pr1Run1.ConcurrencyGroup) + assert.True(t, pr1Run1.ConcurrencyCancel) + assert.True(t, pr1Run1.Status.IsRunning()) + + // user4 forks the repo + req := NewRequestWithJSON(t, "POST", fmt.Sprintf("/api/v1/repos/%s/%s/forks", baseRepo.OwnerName, baseRepo.Name), + &api.CreateForkOption{ + Name: util.ToPointer("actions-concurrency-fork"), + }).AddTokenAuth(user4Token) + resp := MakeRequest(t, req, http.StatusAccepted) + var apiForkRepo api.Repository + DecodeJSON(t, resp, &apiForkRepo) + forkRepo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: apiForkRepo.ID}) + user4APICtx := NewAPITestContext(t, user4.Name, forkRepo.Name, auth_model.AccessTokenScopeWriteRepository) + + // user4 creates a pull request from branch "bugfix/bbb" + doAPICreateFile(user4APICtx, "user4-fix.txt", &api.CreateFileOptions{ + FileOptions: api.FileOptions{ + NewBranchName: "bugfix/bbb", + Message: "create user4-fix.txt", + Author: api.Identity{ + Name: user4.Name, + Email: user4.Email, + }, + Committer: api.Identity{ + Name: user4.Name, + Email: user4.Email, + }, + Dates: api.CommitDateOptions{ + Author: time.Now(), + Committer: time.Now(), + }, + }, + ContentBase64: base64.StdEncoding.EncodeToString([]byte("user4-fix")), + })(t) + doAPICreatePullRequest(user4APICtx, baseRepo.OwnerName, baseRepo.Name, baseRepo.DefaultBranch, fmt.Sprintf("%s:bugfix/bbb", user4.Name))(t) + // cannot fetch the task because an approval is required + runner.fetchNoTask(t) + // user2 approves the run + pr2Run1 := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{RepoID: baseRepo.ID, TriggerUserID: user4.ID}) + req = NewRequestWithValues(t, "POST", + fmt.Sprintf("/%s/%s/actions/runs/%d/approve", baseRepo.OwnerName, baseRepo.Name, pr2Run1.Index), + map[string]string{ + "_csrf": GetUserCSRFToken(t, user2Session), + }) + user2Session.MakeRequest(t, req, http.StatusOK) + // fetch the task and the previous task has been cancelled + pr2Task1 := runner.fetchTask(t) + _, _, pr2Run1 = getTaskAndJobAndRunByTaskID(t, pr2Task1.Id) + assert.Equal(t, "pull-request-test", pr2Run1.ConcurrencyGroup) + assert.True(t, pr2Run1.ConcurrencyCancel) + assert.True(t, pr2Run1.Status.IsRunning()) + pr1Run1 = unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: pr1Run1.ID}) + assert.True(t, pr1Run1.Status.IsCancelled()) + + // user4 creates another pull request from branch "do-not-cancel/ccc" + doAPICreateFile(user4APICtx, "user4-fix2.txt", &api.CreateFileOptions{ + FileOptions: api.FileOptions{ + NewBranchName: "do-not-cancel/ccc", + Message: "create user4-fix2.txt", + Author: api.Identity{ + Name: user4.Name, + Email: user4.Email, + }, + Committer: api.Identity{ + Name: user4.Name, + Email: user4.Email, + }, + Dates: api.CommitDateOptions{ + Author: time.Now(), + Committer: time.Now(), + }, + }, + ContentBase64: base64.StdEncoding.EncodeToString([]byte("user4-fix2")), + })(t) + doAPICreatePullRequest(user4APICtx, baseRepo.OwnerName, baseRepo.Name, baseRepo.DefaultBranch, fmt.Sprintf("%s:do-not-cancel/ccc", user4.Name))(t) + // cannot fetch the task because cancel-in-progress is false + runner.fetchNoTask(t) + runner.execTask(t, pr2Task1, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + }) + pr2Run1 = unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: pr2Run1.ID}) + assert.True(t, pr2Run1.Status.IsSuccess()) + // fetch the task + pr3Task1 := runner.fetchTask(t) + _, _, pr3Run1 := getTaskAndJobAndRunByTaskID(t, pr3Task1.Id) + assert.Equal(t, "pull-request-test", pr3Run1.ConcurrencyGroup) + assert.False(t, pr3Run1.ConcurrencyCancel) + assert.True(t, pr3Run1.Status.IsRunning()) + + doAPIDeleteRepository(user4APICtx)(t) + doAPIDeleteRepository(user2APICtx)(t) + }) +} + +func TestJobConcurrency(t *testing.T) { + onGiteaRun(t, func(t *testing.T, u *url.URL) { + user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) + session := loginUser(t, user2.Name) + token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteRepository, auth_model.AccessTokenScopeWriteUser) + + apiRepo := createActionsTestRepo(t, token, "actions-concurrency", false) + repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: apiRepo.ID}) + runner1 := newMockRunner() + runner1.registerAsRepoRunner(t, user2.Name, repo.Name, "mock-runner-1", []string{"runner1"}) + runner2 := newMockRunner() + runner2.registerAsRepoRunner(t, user2.Name, repo.Name, "mock-runner-2", []string{"runner2"}) + + // add a variable for test + req := NewRequestWithJSON(t, "POST", + fmt.Sprintf("/api/v1/repos/%s/%s/actions/variables/version_var", user2.Name, repo.Name), &api.CreateVariableOption{ + Value: "v1.23.0", + }). + AddTokenAuth(token) + MakeRequest(t, req, http.StatusNoContent) + + wf1TreePath := ".gitea/workflows/concurrent-workflow-1.yml" + wf1FileContent := `name: concurrent-workflow-1 +on: + push: + paths: + - '.gitea/workflows/concurrent-workflow-1.yml' +jobs: + wf1-job1: + runs-on: runner1 + concurrency: + group: job-main-${{ vars.version_var }} + steps: + - run: echo 'wf1-job1' +` + wf2TreePath := ".gitea/workflows/concurrent-workflow-2.yml" + wf2FileContent := `name: concurrent-workflow-2 +on: + push: + paths: + - '.gitea/workflows/concurrent-workflow-2.yml' +jobs: + wf2-job1: + runs-on: runner2 + outputs: + version: ${{ steps.version_step.outputs.app_version }} + steps: + - id: version_step + run: echo "app_version=v1.23.0" >> "$GITHUB_OUTPUT" + wf2-job2: + runs-on: runner1 + needs: [wf2-job1] + concurrency: + group: job-main-${{ needs.wf2-job1.outputs.version }} + steps: + - run: echo 'wf2-job2' +` + wf3TreePath := ".gitea/workflows/concurrent-workflow-3.yml" + wf3FileContent := `name: concurrent-workflow-3 +on: + push: + paths: + - '.gitea/workflows/concurrent-workflow-3.yml' +jobs: + wf3-job1: + runs-on: runner1 + concurrency: + group: job-main-${{ vars.version_var }} + cancel-in-progress: ${{ vars.version_var == 'v1.23.0' }} + steps: + - run: echo 'wf3-job1' +` + + opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, fmt.Sprintf("create %s", wf1TreePath), wf1FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf1TreePath, opts1) + opts2 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, fmt.Sprintf("create %s", wf2TreePath), wf2FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf2TreePath, opts2) + + // fetch wf1-job1 + wf1Job1Task := runner1.fetchTask(t) + _, wf1Job1ActionJob, _ := getTaskAndJobAndRunByTaskID(t, wf1Job1Task.Id) + assert.Equal(t, "job-main-v1.23.0", wf1Job1ActionJob.ConcurrencyGroup) + assert.True(t, wf1Job1ActionJob.Status.IsRunning()) + // fetch and exec wf2-job1 + wf2Job1Task := runner2.fetchTask(t) + runner2.execTask(t, wf2Job1Task, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + outputs: map[string]string{ + "version": "v1.23.0", + }, + }) + // cannot fetch wf2-job2 because wf1-job1 is running + runner1.fetchNoTask(t) + // exec wf1-job1 + runner1.execTask(t, wf1Job1Task, &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + }) + // fetch wf2-job2 + wf2Job2Task := runner1.fetchTask(t) + _, wf2Job2ActionJob, _ := getTaskAndJobAndRunByTaskID(t, wf2Job2Task.Id) + assert.Equal(t, "job-main-v1.23.0", wf2Job2ActionJob.ConcurrencyGroup) + assert.True(t, wf2Job2ActionJob.Status.IsRunning()) + // push workflow3 to trigger wf3-job1 + opts3 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, fmt.Sprintf("create %s", wf3TreePath), wf3FileContent) + createWorkflowFile(t, token, user2.Name, repo.Name, wf3TreePath, opts3) + // fetch wf3-job1 + wf3Job1Task := runner1.fetchTask(t) + _, wf3Job1ActionJob, _ := getTaskAndJobAndRunByTaskID(t, wf3Job1Task.Id) + assert.Equal(t, "job-main-v1.23.0", wf3Job1ActionJob.ConcurrencyGroup) + assert.True(t, wf3Job1ActionJob.Status.IsRunning()) + // wf2-job2 has been cancelled + _, wf2Job2ActionJob, _ = getTaskAndJobAndRunByTaskID(t, wf2Job2Task.Id) + assert.True(t, wf2Job2ActionJob.Status.IsCancelled()) + + httpContext := NewAPITestContext(t, user2.Name, repo.Name, auth_model.AccessTokenScopeWriteRepository) + doAPIDeleteRepository(httpContext)(t) + }) +} + +func getTaskAndJobAndRunByTaskID(t *testing.T, taskID int64) (*actions_model.ActionTask, *actions_model.ActionRunJob, *actions_model.ActionRun) { + actionTask := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionTask{ID: taskID}) + actionRunJob := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunJob{ID: actionTask.JobID}) + actionRun := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: actionRunJob.RunID}) + return actionTask, actionRunJob, actionRun +} diff --git a/tests/integration/actions_runner_test.go b/tests/integration/actions_runner_test.go index 355ea1705e23c..b8afb54068d0c 100644 --- a/tests/integration/actions_runner_test.go +++ b/tests/integration/actions_runner_test.go @@ -92,7 +92,20 @@ func (r *mockRunner) registerAsRepoRunner(t *testing.T, ownerName, repoName, run } func (r *mockRunner) fetchTask(t *testing.T, timeout ...time.Duration) *runnerv1.Task { - fetchTimeout := 10 * time.Second + task := r.tryFetchTask(t, timeout...) + assert.NotNil(t, task, "failed to fetch a task") + return task +} + +func (r *mockRunner) fetchNoTask(t *testing.T, timeout ...time.Duration) { + task := r.tryFetchTask(t, timeout...) + assert.Nil(t, task, "a task is fetched") +} + +const defaultFetchTaskTimeout = 5 * time.Second + +func (r *mockRunner) tryFetchTask(t *testing.T, timeout ...time.Duration) *runnerv1.Task { + fetchTimeout := defaultFetchTaskTimeout if len(timeout) > 0 { fetchTimeout = timeout[0] } @@ -109,7 +122,7 @@ func (r *mockRunner) fetchTask(t *testing.T, timeout ...time.Duration) *runnerv1 } time.Sleep(time.Second) } - assert.NotNil(t, task, "failed to fetch a task") + return task } From 81a7899fde0b0decc39a2a67463de36a94d7a79b Mon Sep 17 00:00:00 2001 From: Zettat123 Date: Thu, 16 Jan 2025 10:42:51 +0800 Subject: [PATCH 2/2] fix schedule --- services/actions/schedule_tasks.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/services/actions/schedule_tasks.go b/services/actions/schedule_tasks.go index 4f6cc47036ff1..afa6aac7f4c43 100644 --- a/services/actions/schedule_tasks.go +++ b/services/actions/schedule_tasks.go @@ -143,6 +143,20 @@ func CreateScheduleTask(ctx context.Context, cron *actions_model.ActionSchedule) if err != nil { return err } + wfRawConcurrency, err := jobparser.ReadWorkflowRawConcurrency(cron.Content) + if err != nil { + return err + } + if wfRawConcurrency != nil { + wfConcurrencyGroup, wfConcurrencyCancel, err := EvaluateWorkflowConcurrency(run, wfRawConcurrency, vars) + if err != nil { + return err + } + if wfConcurrencyGroup != "" { + run.ConcurrencyGroup = wfConcurrencyGroup + run.ConcurrencyCancel = wfConcurrencyCancel + } + } // Insert the action run and its associated jobs into the database if err := InsertRun(ctx, run, workflows); err != nil {