Skip to content

Commit

Permalink
fix: manually scheduled task runs now run when expected (#23664)
Browse files Browse the repository at this point in the history
* fix: run manually scheduled tasks at their scheduled time

* fix: actually use it

* fix: get tests building

* fix: fix tests

* fix: lint
  • Loading branch information
jeffreyssmith2nd authored Oct 13, 2022
1 parent 34254ee commit a0f1184
Show file tree
Hide file tree
Showing 7 changed files with 190 additions and 15 deletions.
4 changes: 4 additions & 0 deletions cmd/influxd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,10 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
combinedTaskService,
executor.WithFlagger(m.flagger),
)
err = executor.LoadExistingScheduleRuns(ctx)
if err != nil {
m.log.Fatal("could not load existing scheduled runs", zap.Error(err))
}
m.executor = executor
m.reg.MustRegister(executorMetrics.PrometheusCollectors()...)
schLogger := m.log.With(zap.String("service", "task-scheduler"))
Expand Down
9 changes: 9 additions & 0 deletions kv/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1012,6 +1012,15 @@ func (s *Service) findRunByID(ctx context.Context, tx Tx, taskID, runID platform
runBytes, err := bucket.Get(key)
if err != nil {
if IsNotFound(err) {
runs, err := s.manualRuns(ctx, tx, taskID)
for _, run := range runs {
if run.ID == runID {
return run, nil
}
}
if err != nil {
return nil, taskmodel.ErrRunNotFound
}
return nil, taskmodel.ErrRunNotFound
}
return nil, taskmodel.ErrUnexpectedTaskBucketErr(err)
Expand Down
18 changes: 12 additions & 6 deletions task/backend/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const DefaultLimit = 1000
// Executor is an abstraction of the task executor with only the functions needed by the coordinator
type Executor interface {
ManualRun(ctx context.Context, id platform.ID, runID platform.ID) (executor.Promise, error)
ScheduleManualRun(ctx context.Context, id platform.ID, runID platform.ID) error
Cancel(ctx context.Context, runID platform.ID) error
}

Expand Down Expand Up @@ -149,7 +150,7 @@ func (c *Coordinator) TaskUpdated(ctx context.Context, from, to *taskmodel.Task)
return nil
}

//TaskDeleted asks the Scheduler to release the deleted task
// TaskDeleted asks the Scheduler to release the deleted task
func (c *Coordinator) TaskDeleted(ctx context.Context, id platform.ID) error {
tid := scheduler.ID(id)
if err := c.sch.Release(tid); err != nil && err != taskmodel.ErrTaskNotClaimed {
Expand All @@ -166,14 +167,19 @@ func (c *Coordinator) RunCancelled(ctx context.Context, runID platform.ID) error
return err
}

// RunForced speaks directly to the Executor to run a task immediately
// RunForced speaks directly to the Executor to run a task immediately, or schedule the run if `scheduledFor` is set.
func (c *Coordinator) RunForced(ctx context.Context, task *taskmodel.Task, run *taskmodel.Run) error {
// the returned promise is not used, since clients expect the HTTP server to return immediately after scheduling the
// task rather than waiting for the task to finish
_, err := c.ex.ManualRun(ctx, task.ID, run.ID)
var err error
if !run.ScheduledFor.IsZero() {
err = c.ex.ScheduleManualRun(ctx, task.ID, run.ID)
} else {
// the returned promise is not used, since clients expect the HTTP server to return immediately after scheduling the
// task rather than waiting for the task to finish
_, err = c.ex.ManualRun(ctx, task.ID, run.ID)
}

if err != nil {
return taskmodel.ErrRunExecutionError(err)
}

return nil
}
24 changes: 20 additions & 4 deletions task/backend/coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ func Test_Coordinator_Executor_Methods(t *testing.T) {
taskOne = &taskmodel.Task{ID: one}

runOne = &taskmodel.Run{
ID: one,
TaskID: one,
ScheduledFor: time.Now(),
ID: one,
TaskID: one,
}

allowUnexported = cmp.AllowUnexported(executorE{}, schedulerC{}, SchedulableTask{})

scheduledTime = time.Now()
)

for _, test := range []struct {
Expand All @@ -45,7 +46,22 @@ func Test_Coordinator_Executor_Methods(t *testing.T) {
},
executor: &executorE{
calls: []interface{}{
manualRunCall{taskOne.ID, runOne.ID},
manualRunCall{taskOne.ID, runOne.ID, false},
},
},
},
{
name: "RunForcedScheduled",
call: func(t *testing.T, c *Coordinator) {
rr := runOne
rr.ScheduledFor = scheduledTime
if err := c.RunForced(context.Background(), taskOne, runOne); err != nil {
t.Errorf("expected nil error found %q", err)
}
},
executor: &executorE{
calls: []interface{}{
manualRunCall{taskOne.ID, runOne.ID, true},
},
},
},
Expand Down
12 changes: 9 additions & 3 deletions task/backend/coordinator/support_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ type (
}

manualRunCall struct {
TaskID platform.ID
RunID platform.ID
TaskID platform.ID
RunID platform.ID
WasScheduled bool
}

cancelCallC struct {
Expand Down Expand Up @@ -96,7 +97,7 @@ func (s *schedulerC) Release(taskID scheduler.ID) error {
}

func (e *executorE) ManualRun(ctx context.Context, id platform.ID, runID platform.ID) (executor.Promise, error) {
e.calls = append(e.calls, manualRunCall{id, runID})
e.calls = append(e.calls, manualRunCall{id, runID, false})
ctx, cancel := context.WithCancel(ctx)
p := promise{
done: make(chan struct{}),
Expand All @@ -109,6 +110,11 @@ func (e *executorE) ManualRun(ctx context.Context, id platform.ID, runID platfor
return &p, err
}

func (e *executorE) ScheduleManualRun(ctx context.Context, id platform.ID, runID platform.ID) error {
e.calls = append(e.calls, manualRunCall{id, runID, true})
return nil
}

func (e *executorE) Cancel(ctx context.Context, runID platform.ID) error {
e.calls = append(e.calls, cancelCallC{runID})
return nil
Expand Down
134 changes: 132 additions & 2 deletions task/backend/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func NewExecutor(log *zap.Logger, qs query.QueryService, us PermissionService, t
ps: us,

currentPromises: sync.Map{},
futurePromises: sync.Map{},
promiseQueue: make(chan *promise, maxPromises),
workerLimit: make(chan struct{}, cfg.maxWorkers),
limitFunc: func(*taskmodel.Task, *taskmodel.Run) error { return nil }, // noop
Expand All @@ -159,6 +160,8 @@ func NewExecutor(log *zap.Logger, qs query.QueryService, us PermissionService, t
e: e,
}

go e.processScheduledTasks()

e.workerPool = sync.Pool{New: wm.new}
return e, e.metrics
}
Expand All @@ -177,6 +180,9 @@ type Executor struct {
// currentPromises are all the promises we are made that have not been fulfilled
currentPromises sync.Map

// futurePromises are promises that are scheduled to be executed in the future
futurePromises sync.Map

// keep a pool of promise's we have in queue
promiseQueue chan *promise

Expand All @@ -191,6 +197,52 @@ type Executor struct {
flagger feature.Flagger
}

func (e *Executor) LoadExistingScheduleRuns(ctx context.Context) error {
tasks, _, err := e.ts.FindTasks(ctx, taskmodel.TaskFilter{})
if err != nil {
e.log.Error("err finding tasks:", zap.Error(err))
return err
}
for _, t := range tasks {
beforeTime := time.Now().Add(time.Hour * 24 * 365).Format(time.RFC3339)
runs, _, err := e.ts.FindRuns(ctx, taskmodel.RunFilter{Task: t.ID, BeforeTime: beforeTime})
if err != nil {
e.log.Error("err finding runs:", zap.Error(err))
return err
}
for _, run := range runs {
if run.ScheduledFor.After(time.Now()) {
perm, err := e.ps.FindPermissionForUser(ctx, t.OwnerID)
if err != nil {
e.log.Error("err finding perms:", zap.Error(err))
return err
}

ctx, cancel := context.WithCancel(ctx)
// create promise
p := &promise{
run: run,
task: t,
auth: &influxdb.Authorization{
Status: influxdb.Active,
UserID: t.OwnerID,
ID: platform.ID(1),
OrgID: t.OrganizationID,
Permissions: perm,
},
createdAt: time.Now().UTC(),
done: make(chan struct{}),
ctx: ctx,
cancelFunc: cancel,
}
e.futurePromises.Store(run.ID, p)
}
}
}

return nil
}

// SetLimitFunc sets the limit func for this task executor
func (e *Executor) SetLimitFunc(l LimitFunc) {
e.limitFunc = l
Expand Down Expand Up @@ -241,6 +293,58 @@ func (e *Executor) ManualRun(ctx context.Context, id platform.ID, runID platform
return p, err
}

func (e *Executor) ScheduleManualRun(ctx context.Context, id platform.ID, runID platform.ID) error {
// create promises for any manual runs
r, err := e.tcs.StartManualRun(ctx, id, runID)
if err != nil {
return err
}

auth, err := icontext.GetAuthorizer(ctx)
if err != nil {
return err
}

// create a new context for running the task in the background so that returning the HTTP response does not cancel the
// context of the task to be run
ctx = icontext.SetAuthorizer(context.Background(), auth)

span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()

t, err := e.ts.FindTaskByID(ctx, r.TaskID)
if err != nil {
return err
}

perm, err := e.ps.FindPermissionForUser(ctx, t.OwnerID)
if err != nil {
return err
}

ctx, cancel := context.WithCancel(ctx)
// create promise
p := &promise{
run: r,
task: t,
auth: &influxdb.Authorization{
Status: influxdb.Active,
UserID: t.OwnerID,
ID: platform.ID(1),
OrgID: t.OrganizationID,
Permissions: perm,
},
createdAt: time.Now().UTC(),
done: make(chan struct{}),
ctx: ctx,
cancelFunc: cancel,
}
e.metrics.manualRunsCounter.WithLabelValues(id.String()).Inc()

e.futurePromises.Store(runID, p)
return nil
}

func (e *Executor) ResumeCurrentRun(ctx context.Context, id platform.ID, runID platform.ID) (Promise, error) {
cr, err := e.tcs.CurrentlyRunning(ctx, id)
if err != nil {
Expand Down Expand Up @@ -363,6 +467,23 @@ func (e *Executor) createPromise(ctx context.Context, run *taskmodel.Run) (*prom
return p, nil
}

func (e *Executor) processScheduledTasks() {
t := time.Tick(1 * time.Second)
for range t {
e.futurePromises.Range(func(k any, v any) bool {
vv := v.(*promise)
if vv.run.ScheduledFor.Equal(time.Now()) || vv.run.ScheduledFor.Before(time.Now()) {
if vv.run.RunAt.IsZero() {
e.promiseQueue <- vv
e.futurePromises.Delete(k)
e.startWorker()
}
}
return true
})
}
}

type workerMaker struct {
e *Executor
}
Expand Down Expand Up @@ -445,9 +566,18 @@ func (w *worker) start(p *promise) {
defer span.Finish()

// add to run log
w.e.tcs.AddRunLog(p.ctx, p.task.ID, p.run.ID, time.Now().UTC(), fmt.Sprintf("Started task from script: %q", p.task.Flux))
if err := w.e.tcs.AddRunLog(p.ctx, p.task.ID, p.run.ID, time.Now().UTC(), fmt.Sprintf("Started task from script: %q", p.task.Flux)); err != nil {
tid := zap.String("taskID", p.task.ID.String())
rid := zap.String("runID", p.run.ID.String())
w.e.log.With(zap.Error(err)).With(tid).With(rid).Warn("error adding run log: ")
}

// update run status
w.e.tcs.UpdateRunState(ctx, p.task.ID, p.run.ID, time.Now().UTC(), taskmodel.RunStarted)
if err := w.e.tcs.UpdateRunState(ctx, p.task.ID, p.run.ID, time.Now().UTC(), taskmodel.RunStarted); err != nil {
tid := zap.String("taskID", p.task.ID.String())
rid := zap.String("runID", p.run.ID.String())
w.e.log.With(zap.Error(err)).With(tid).With(rid).Warn("error updating run state: ")
}

// add to metrics
w.e.metrics.StartRun(p.task, time.Since(p.createdAt), time.Since(p.run.RunAt))
Expand Down
4 changes: 4 additions & 0 deletions task/mock/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ func (e *Executor) ManualRun(ctx context.Context, id platform.ID, runID platform
return p, err
}

func (e *Executor) ScheduleManualRun(ctx context.Context, id platform.ID, runID platform.ID) error {
return nil
}

func (e *Executor) Wait() {
e.wg.Wait()
}
Expand Down

0 comments on commit a0f1184

Please sign in to comment.