Skip to content

Commit

Permalink
Do not start jobs on wait list after restart
Browse files Browse the repository at this point in the history
  • Loading branch information
hlubek committed Jul 30, 2021
1 parent 80e5e14 commit 5c16f4b
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 31 deletions.
46 changes: 19 additions & 27 deletions prunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,14 @@ func newPipelineRunner(ctx context.Context, defs *definition.PipelinesDef, taskR
sched := taskctl.NewScheduler(taskRunner)

pRunner := &pipelineRunner{
defs: defs,
sched: sched,
taskRunner: taskRunner,
defs: defs,
sched: sched,
taskRunner: taskRunner,
// jobsByID contains ALL jobs, no matter whether they are on the waitlist or are scheduled or cancelled.
jobsByID: make(map[uuid.UUID]*pipelineJob),
// jobsByPipeline contains ALL jobs, no matter whether they are on the waitlist or are scheduled or cancelled.
jobsByPipeline: make(map[string][]*pipelineJob),
// waitListByPipeline additionally contains all the jobs currently waiting, but not yet started (because concurrency limits have been reached)
waitListByPipeline: make(map[string][]*pipelineJob),
store: store,
// Use channel buffered with one extra slot so we can keep save requests while a save is running without blocking
Expand Down Expand Up @@ -169,7 +172,7 @@ type pipelineRunner struct {
waitListByPipeline map[string][]*pipelineJob

// store is the implementation for persisting data
store dataStore
store dataStore
// persistRequests is for triggering saving-the-store, which is then handled asynchronously, at most every 3 seconds (see newPipelineRunner)
// externally, call requestPersist()
persistRequests chan struct{}
Expand All @@ -178,7 +181,7 @@ type pipelineRunner struct {
mx sync.RWMutex
}

// pipelineJob is a single execution context (a single run of a single pipeline). Can be scheduled (in the waitlistByPipeline of pipelineRunner),
// pipelineJob is a single execution context (a single run of a single pipeline). Can be scheduled (in the waitListByPipeline of pipelineRunner),
// or currently running (jobsByID / jobsByPipeline in pipelineRunner)
type pipelineJob struct {
ID uuid.UUID
Expand Down Expand Up @@ -694,22 +697,19 @@ func (r *pipelineRunner) InitialLoadFromStore() error {
Warnf("Found running job when restoring state, marked as canceled")
}

r.jobsByID[pJob.ID] = job
r.jobsByPipeline[pJob.Pipeline] = append(r.jobsByPipeline[pJob.Pipeline], job)
}

for pipeline, waitList := range data.WaitLists {
for _, jobID := range waitList {
job := r.jobsByID[jobID]
if job == nil {
log.Errorf("Job %s on wait list for pipeline %s was not defined", jobID, pipeline)
continue
}
// Cancel jobs which have been scheduled on wait list but never been started
if job.Start == nil {
job.Canceled = true

r.waitListByPipeline[pipeline] = append(r.waitListByPipeline[pipeline], job)
log.
WithField("component", "runner").
WithField("jobID", job.ID).
WithField("pipeline", job.Pipeline).
Warnf("Found job on wait list when restoring state, marked as canceled")
}

r.startJobsOnWaitList(pipeline)
r.jobsByID[pJob.ID] = job
r.jobsByPipeline[pJob.Pipeline] = append(r.jobsByPipeline[pJob.Pipeline], job)
}

return nil
Expand All @@ -722,8 +722,7 @@ func (r *pipelineRunner) SaveToStore() {

r.mx.RLock()
data := &persistedData{
Jobs: make([]persistedJob, 0, len(r.jobsByID)),
WaitLists: make(map[string][]uuid.UUID),
Jobs: make([]persistedJob, 0, len(r.jobsByID)),
}
for _, job := range r.jobsByID {
tasks := make([]persistedTask, len(job.Tasks))
Expand Down Expand Up @@ -755,13 +754,6 @@ func (r *pipelineRunner) SaveToStore() {
Tasks: tasks,
})
}
for pipeline, jobs := range r.waitListByPipeline {
waitList := make([]uuid.UUID, len(jobs))
for i, job := range jobs {
waitList[i] = job.ID
}
data.WaitLists[pipeline] = waitList
}
r.mx.RUnlock()

// We do not need to lock here, the single save loops guarantees non-concurrent saves
Expand Down
5 changes: 1 addition & 4 deletions store.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ type persistedTask struct {

type persistedData struct {
Jobs []persistedJob
WaitLists map[string][]uuid.UUID
}

type dataStore interface {
Expand Down Expand Up @@ -72,9 +71,7 @@ func newJSONDataStore(path string) (*jsonDataStore, error) {
func (j *jsonDataStore) Load() (*persistedData, error) {
f, err := os.Open(path.Join(j.path, "data.json"))
if errors.Is(err, os.ErrNotExist) {
return &persistedData{
WaitLists: make(map[string][]uuid.UUID),
}, nil
return &persistedData{}, nil
} else if err != nil {
return nil, errors.Wrap(err, "opening file")
}
Expand Down

0 comments on commit 5c16f4b

Please sign in to comment.