Skip to content

Commit

Permalink
refactor: long polling for runner jobs (#1727)
Browse files Browse the repository at this point in the history
Signed-off-by: Ivan Dagelic <[email protected]>
  • Loading branch information
idagelic authored Jan 17, 2025
1 parent 5d1490e commit c213b32
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 53 deletions.
36 changes: 30 additions & 6 deletions pkg/api/controllers/runner/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@ package runner
import (
"fmt"
"net/http"
"time"

"github.com/daytonaio/daytona/pkg/server"
"github.com/daytonaio/daytona/pkg/services"
"github.com/daytonaio/daytona/pkg/stores"
"github.com/gin-gonic/gin"
)

const LONG_POLL_TIMEOUT = 1 * time.Minute
const LONG_POLL_INTERVAL = 50 * time.Millisecond

// ListRunnerJobs godoc
//
// @Tags runner
Expand All @@ -29,13 +33,33 @@ func ListRunnerJobs(ctx *gin.Context) {

server := server.GetInstance(nil)

jobs, err := server.RunnerService.ListRunnerJobs(ctx.Request.Context(), runnerId)
if err != nil {
ctx.AbortWithError(http.StatusInternalServerError, fmt.Errorf("failed to get runner: %w", err))
return
}
timeout := time.After(LONG_POLL_TIMEOUT)
ticker := time.NewTicker(LONG_POLL_INTERVAL)
defer ticker.Stop()

ctx.JSON(200, jobs)
for {
select {
case <-ctx.Request.Context().Done():
// Handle client cancelling the request
ctx.AbortWithStatus(http.StatusRequestTimeout)
return
case <-timeout:
// Handle request timing out
ctx.JSON(http.StatusNoContent, nil)
return
case <-ticker.C:
// Check for new jobs
jobs, err := server.RunnerService.ListRunnerJobs(ctx.Request.Context(), runnerId)
if err != nil {
ctx.AbortWithError(http.StatusInternalServerError, fmt.Errorf("failed to get runner jobs: %w", err))
return
}
if len(jobs) > 0 {
ctx.JSON(http.StatusOK, jobs)
return
}
}
}
}

// UpdateJobState godoc
Expand Down
6 changes: 3 additions & 3 deletions pkg/cmd/server/runner/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ var deleteCmd = &cobra.Command{
return nil
}

selectedRunner, err := runner.GetRunnerFromPrompt(runners, activeProfile.Name, "Unregister")
selectedRunner, err := runner.GetRunnerFromPrompt(runners, activeProfile.Name, "Delete")
if err != nil {
if common.IsCtrlCAbort(err) {
return nil
Expand All @@ -80,7 +80,7 @@ var deleteCmd = &cobra.Command{
form := huh.NewForm(
huh.NewGroup(
huh.NewConfirm().
Title(fmt.Sprintf("Unregister runner %s?", selectedRunnerId)).
Title(fmt.Sprintf("Delete runner %s?", selectedRunnerId)).
Description("It is recommended that you remove all target configs, targets and workspaces associated with it.").
Value(&confirm),
),
Expand All @@ -97,7 +97,7 @@ var deleteCmd = &cobra.Command{
return apiclient_util.HandleErrorResponse(res, err)
}

views.RenderInfoMessageBold(fmt.Sprintf("Runner %s unregistered successfully", selectedRunnerId))
views.RenderInfoMessageBold(fmt.Sprintf("Runner %s deleted successfully", selectedRunnerId))
return nil
},
}
Expand Down
75 changes: 33 additions & 42 deletions pkg/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,12 @@ import (
"github.com/daytonaio/daytona/pkg/jobs/workspace"
"github.com/daytonaio/daytona/pkg/models"
"github.com/daytonaio/daytona/pkg/runner/providermanager"
"github.com/daytonaio/daytona/pkg/scheduler"
"github.com/daytonaio/daytona/pkg/telemetry"
"github.com/gin-gonic/gin"
"github.com/hashicorp/go-plugin"
log "github.com/sirupsen/logrus"
)

// TODO: add lock when running interval func
// 1 second interval
const DEFAULT_JOB_POLL_INTERVAL = "*/1 * * * * *"

const RUNNER_METADATA_UPDATE_INTERVAL = 2 * time.Second

type IRunner interface {
Expand Down Expand Up @@ -155,21 +150,16 @@ func (r *Runner) Start(ctx context.Context) error {
return err
}

scheduler := scheduler.NewCronScheduler()

err = scheduler.AddFunc(DEFAULT_JOB_POLL_INTERVAL, func() {
err := r.CheckAndRunJobs(ctx)
if err != nil {
r.logger.Error(err)
go func() {
for {
err := r.CheckAndRunJobs(ctx)
if err != nil {
r.logger.Error(err)
// Handle the function continuously erroring (e.g. authentication)
time.Sleep(3 * time.Second)
}
}
})
if err != nil {
return err
}

scheduler.Start()

r.logger.Info("Runner started")
}()

go func() {
for {
Expand All @@ -178,15 +168,15 @@ func (r *Runner) Start(ctx context.Context) error {
}
}()

r.logger.Info("Runner started")

select {
case err = <-routerErrChan:
case <-ctx.Done():
err = nil
}

r.logger.Info("Shutting down runner")
scheduler.Stop()

return err
}

Expand All @@ -199,12 +189,20 @@ func (r *Runner) CheckAndRunJobs(ctx context.Context) error {
return err
}

// goroutines, sync group
for _, job := range jobs {
err = r.runJob(ctx, job)
for _, j := range jobs {
if j.State != models.JobStatePending {
continue
}

j.State = models.JobStateRunning
err := r.updateJobState(ctx, j.Id, models.JobStateRunning, nil)
if err != nil {
return err
r.logger.Trace(err)
continue
}

r.logJobStateUpdate(j, nil)
go r.runJob(ctx, j)
}

return nil
Expand All @@ -214,7 +212,7 @@ func (r *Runner) Purge(ctx context.Context) error {
return r.providerManager.Purge()
}

func (r *Runner) runJob(ctx context.Context, j *models.Job) error {
func (r *Runner) runJob(ctx context.Context, j *models.Job) {
startTime := time.Now()
if r.Config.TelemetryEnabled {
event := telemetry.NewJobEvent(telemetry.JobEventRunStarted, j, nil, nil)
Expand All @@ -226,14 +224,6 @@ func (r *Runner) runJob(ctx context.Context, j *models.Job) error {

var job jobs.IJob

j.State = models.JobStateRunning
err := r.updateJobState(ctx, j.Id, models.JobStateRunning, nil)
if err != nil {
return r.handleRunFailed(j, err, startTime)
}

r.logJobStateUpdate(j, nil)

switch j.ResourceType {
case models.ResourceTypeWorkspace:
job = r.workspaceJobFactory.Create(*j)
Expand All @@ -244,19 +234,19 @@ func (r *Runner) runJob(ctx context.Context, j *models.Job) error {
case models.ResourceTypeRunner:
job = r.runnerJobFactory.Create(*j)
default:
return errors.New("invalid resource type for job")
r.handleRunFailed(j, errors.New("invalid resource type for job"), startTime)
}

err = job.Execute(ctx)
err := job.Execute(ctx)
if err != nil {
return r.handleRunFailed(j, err, startTime)
r.handleRunFailed(j, err, startTime)
}

j.State = models.JobStateSuccess
r.logJobStateUpdate(j, nil)
err = r.updateJobState(ctx, j.Id, models.JobStateSuccess, nil)
if err != nil {
return r.handleRunFailed(j, err, startTime)
r.handleRunFailed(j, err, startTime)
}

if r.Config.TelemetryEnabled {
Expand All @@ -267,8 +257,6 @@ func (r *Runner) runJob(ctx context.Context, j *models.Job) error {
r.logger.Trace(err)
}
}

return nil
}

// Runner uptime in seconds
Expand Down Expand Up @@ -325,7 +313,7 @@ func (r *Runner) logJobStateUpdate(j *models.Job, err error) {
r.logger.Info(message)
}

func (r *Runner) handleRunFailed(j *models.Job, err error, startTime time.Time) error {
func (r *Runner) handleRunFailed(j *models.Job, err error, startTime time.Time) {
j.State = models.JobStateError
r.logJobStateUpdate(j, err)

Expand All @@ -338,5 +326,8 @@ func (r *Runner) handleRunFailed(j *models.Job, err error, startTime time.Time)
}
}

return r.updateJobState(context.Background(), j.Id, models.JobStateError, err)
updateJobErr := r.updateJobState(context.Background(), j.Id, models.JobStateError, err)
if updateJobErr != nil {
r.logger.Error(updateJobErr)
}
}
7 changes: 5 additions & 2 deletions pkg/server/workspacetemplates/prebuild.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@ import (

"github.com/daytonaio/daytona/pkg/gitprovider"
"github.com/daytonaio/daytona/pkg/models"
"github.com/daytonaio/daytona/pkg/runner"
"github.com/daytonaio/daytona/pkg/scheduler"
"github.com/daytonaio/daytona/pkg/services"
"github.com/daytonaio/daytona/pkg/stores"
log "github.com/sirupsen/logrus"
)

// TODO: add lock when running interval func
// 1 second interval
const DEFAULT_RETENTION_POLL_INTERVAL = "*/1 * * * * *"

func (s *WorkspaceTemplateService) FindPrebuild(ctx context.Context, workspaceTemplateFilter *stores.WorkspaceTemplateFilter, prebuildFilter *stores.PrebuildFilter) (*services.PrebuildDTO, error) {
wt, err := s.templateStore.Find(ctx, workspaceTemplateFilter)
if err != nil {
Expand Down Expand Up @@ -179,7 +182,7 @@ func (s *WorkspaceTemplateService) EnforceRetentionPolicy(ctx context.Context) e
func (s *WorkspaceTemplateService) StartRetentionPoller(ctx context.Context) error {
scheduler := scheduler.NewCronScheduler()

err := scheduler.AddFunc(runner.DEFAULT_JOB_POLL_INTERVAL, func() {
err := scheduler.AddFunc(DEFAULT_RETENTION_POLL_INTERVAL, func() {
err := s.EnforceRetentionPolicy(ctx)
if err != nil {
log.Error(err)
Expand Down

0 comments on commit c213b32

Please sign in to comment.