Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: long polling for runner jobs #1727

Merged
merged 2 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 30 additions & 6 deletions pkg/api/controllers/runner/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@ package runner
import (
"fmt"
"net/http"
"time"

"github.com/daytonaio/daytona/pkg/server"
"github.com/daytonaio/daytona/pkg/services"
"github.com/daytonaio/daytona/pkg/stores"
"github.com/gin-gonic/gin"
)

const LONG_POLL_TIMEOUT = 1 * time.Minute
const LONG_POLL_INTERVAL = 50 * time.Millisecond

// ListRunnerJobs godoc
//
// @Tags runner
Expand All @@ -29,13 +33,33 @@ func ListRunnerJobs(ctx *gin.Context) {

server := server.GetInstance(nil)

jobs, err := server.RunnerService.ListRunnerJobs(ctx.Request.Context(), runnerId)
if err != nil {
ctx.AbortWithError(http.StatusInternalServerError, fmt.Errorf("failed to get runner: %w", err))
return
}
timeout := time.After(LONG_POLL_TIMEOUT)
ticker := time.NewTicker(LONG_POLL_INTERVAL)
defer ticker.Stop()

ctx.JSON(200, jobs)
for {
select {
case <-ctx.Request.Context().Done():
// Handle client cancelling the request
ctx.AbortWithStatus(http.StatusRequestTimeout)
return
case <-timeout:
// Handle request timing out
ctx.JSON(http.StatusNoContent, nil)
return
case <-ticker.C:
// Check for new jobs
jobs, err := server.RunnerService.ListRunnerJobs(ctx.Request.Context(), runnerId)
if err != nil {
ctx.AbortWithError(http.StatusInternalServerError, fmt.Errorf("failed to get runner jobs: %w", err))
return
}
if len(jobs) > 0 {
ctx.JSON(http.StatusOK, jobs)
return
}
}
}
}

// UpdateJobState godoc
Expand Down
6 changes: 3 additions & 3 deletions pkg/cmd/server/runner/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ var deleteCmd = &cobra.Command{
return nil
}

selectedRunner, err := runner.GetRunnerFromPrompt(runners, activeProfile.Name, "Unregister")
selectedRunner, err := runner.GetRunnerFromPrompt(runners, activeProfile.Name, "Delete")
if err != nil {
if common.IsCtrlCAbort(err) {
return nil
Expand All @@ -80,7 +80,7 @@ var deleteCmd = &cobra.Command{
form := huh.NewForm(
huh.NewGroup(
huh.NewConfirm().
Title(fmt.Sprintf("Unregister runner %s?", selectedRunnerId)).
Title(fmt.Sprintf("Delete runner %s?", selectedRunnerId)).
Description("It is recommended that you remove all target configs, targets and workspaces associated with it.").
Value(&confirm),
),
Expand All @@ -97,7 +97,7 @@ var deleteCmd = &cobra.Command{
return apiclient_util.HandleErrorResponse(res, err)
}

views.RenderInfoMessageBold(fmt.Sprintf("Runner %s unregistered successfully", selectedRunnerId))
views.RenderInfoMessageBold(fmt.Sprintf("Runner %s deleted successfully", selectedRunnerId))
return nil
},
}
Expand Down
75 changes: 33 additions & 42 deletions pkg/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,12 @@ import (
"github.com/daytonaio/daytona/pkg/jobs/workspace"
"github.com/daytonaio/daytona/pkg/models"
"github.com/daytonaio/daytona/pkg/runner/providermanager"
"github.com/daytonaio/daytona/pkg/scheduler"
"github.com/daytonaio/daytona/pkg/telemetry"
"github.com/gin-gonic/gin"
"github.com/hashicorp/go-plugin"
log "github.com/sirupsen/logrus"
)

// TODO: add lock when running interval func
// 1 second interval
const DEFAULT_JOB_POLL_INTERVAL = "*/1 * * * * *"

const RUNNER_METADATA_UPDATE_INTERVAL = 2 * time.Second

type IRunner interface {
Expand Down Expand Up @@ -155,21 +150,16 @@ func (r *Runner) Start(ctx context.Context) error {
return err
}

scheduler := scheduler.NewCronScheduler()

err = scheduler.AddFunc(DEFAULT_JOB_POLL_INTERVAL, func() {
err := r.CheckAndRunJobs(ctx)
if err != nil {
r.logger.Error(err)
go func() {
for {
err := r.CheckAndRunJobs(ctx)
if err != nil {
r.logger.Error(err)
// Handle the function continuously erroring (e.g. authentication)
time.Sleep(3 * time.Second)
}
}
})
if err != nil {
return err
}

scheduler.Start()

r.logger.Info("Runner started")
}()

go func() {
for {
Expand All @@ -178,15 +168,15 @@ func (r *Runner) Start(ctx context.Context) error {
}
}()

r.logger.Info("Runner started")

select {
case err = <-routerErrChan:
case <-ctx.Done():
err = nil
}

r.logger.Info("Shutting down runner")
scheduler.Stop()

return err
}

Expand All @@ -199,12 +189,20 @@ func (r *Runner) CheckAndRunJobs(ctx context.Context) error {
return err
}

// goroutines, sync group
for _, job := range jobs {
err = r.runJob(ctx, job)
for _, j := range jobs {
if j.State != models.JobStatePending {
continue
}

j.State = models.JobStateRunning
err := r.updateJobState(ctx, j.Id, models.JobStateRunning, nil)
if err != nil {
return err
r.logger.Trace(err)
continue
}

r.logJobStateUpdate(j, nil)
go r.runJob(ctx, j)
}

return nil
Expand All @@ -214,7 +212,7 @@ func (r *Runner) Purge(ctx context.Context) error {
return r.providerManager.Purge()
}

func (r *Runner) runJob(ctx context.Context, j *models.Job) error {
func (r *Runner) runJob(ctx context.Context, j *models.Job) {
startTime := time.Now()
if r.Config.TelemetryEnabled {
event := telemetry.NewJobEvent(telemetry.JobEventRunStarted, j, nil, nil)
Expand All @@ -226,14 +224,6 @@ func (r *Runner) runJob(ctx context.Context, j *models.Job) error {

var job jobs.IJob

j.State = models.JobStateRunning
err := r.updateJobState(ctx, j.Id, models.JobStateRunning, nil)
if err != nil {
return r.handleRunFailed(j, err, startTime)
}

r.logJobStateUpdate(j, nil)

switch j.ResourceType {
case models.ResourceTypeWorkspace:
job = r.workspaceJobFactory.Create(*j)
Expand All @@ -244,19 +234,19 @@ func (r *Runner) runJob(ctx context.Context, j *models.Job) error {
case models.ResourceTypeRunner:
job = r.runnerJobFactory.Create(*j)
default:
return errors.New("invalid resource type for job")
r.handleRunFailed(j, errors.New("invalid resource type for job"), startTime)
}

err = job.Execute(ctx)
err := job.Execute(ctx)
if err != nil {
return r.handleRunFailed(j, err, startTime)
r.handleRunFailed(j, err, startTime)
}

j.State = models.JobStateSuccess
r.logJobStateUpdate(j, nil)
err = r.updateJobState(ctx, j.Id, models.JobStateSuccess, nil)
if err != nil {
return r.handleRunFailed(j, err, startTime)
r.handleRunFailed(j, err, startTime)
}

if r.Config.TelemetryEnabled {
Expand All @@ -267,8 +257,6 @@ func (r *Runner) runJob(ctx context.Context, j *models.Job) error {
r.logger.Trace(err)
}
}

return nil
}

// Runner uptime in seconds
Expand Down Expand Up @@ -325,7 +313,7 @@ func (r *Runner) logJobStateUpdate(j *models.Job, err error) {
r.logger.Info(message)
}

func (r *Runner) handleRunFailed(j *models.Job, err error, startTime time.Time) error {
func (r *Runner) handleRunFailed(j *models.Job, err error, startTime time.Time) {
j.State = models.JobStateError
r.logJobStateUpdate(j, err)

Expand All @@ -338,5 +326,8 @@ func (r *Runner) handleRunFailed(j *models.Job, err error, startTime time.Time)
}
}

return r.updateJobState(context.Background(), j.Id, models.JobStateError, err)
updateJobErr := r.updateJobState(context.Background(), j.Id, models.JobStateError, err)
if updateJobErr != nil {
r.logger.Error(updateJobErr)
}
}
7 changes: 5 additions & 2 deletions pkg/server/workspacetemplates/prebuild.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@ import (

"github.com/daytonaio/daytona/pkg/gitprovider"
"github.com/daytonaio/daytona/pkg/models"
"github.com/daytonaio/daytona/pkg/runner"
"github.com/daytonaio/daytona/pkg/scheduler"
"github.com/daytonaio/daytona/pkg/services"
"github.com/daytonaio/daytona/pkg/stores"
log "github.com/sirupsen/logrus"
)

// TODO: add lock when running interval func
// 1 second interval
const DEFAULT_RETENTION_POLL_INTERVAL = "*/1 * * * * *"

func (s *WorkspaceTemplateService) FindPrebuild(ctx context.Context, workspaceTemplateFilter *stores.WorkspaceTemplateFilter, prebuildFilter *stores.PrebuildFilter) (*services.PrebuildDTO, error) {
wt, err := s.templateStore.Find(ctx, workspaceTemplateFilter)
if err != nil {
Expand Down Expand Up @@ -179,7 +182,7 @@ func (s *WorkspaceTemplateService) EnforceRetentionPolicy(ctx context.Context) e
func (s *WorkspaceTemplateService) StartRetentionPoller(ctx context.Context) error {
scheduler := scheduler.NewCronScheduler()

err := scheduler.AddFunc(runner.DEFAULT_JOB_POLL_INTERVAL, func() {
err := scheduler.AddFunc(DEFAULT_RETENTION_POLL_INTERVAL, func() {
err := s.EnforceRetentionPolicy(ctx)
if err != nil {
log.Error(err)
Expand Down