Skip to content

Commit

Permalink
Add queueing and replacement
Browse files Browse the repository at this point in the history
  • Loading branch information
hlubek committed Jun 28, 2021
1 parent 78bf1b4 commit 4e8cfc6
Show file tree
Hide file tree
Showing 4 changed files with 257 additions and 40 deletions.
1 change: 1 addition & 0 deletions definition/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func (d *PipelinesDef) Load(path string) error {
if err != nil {
return errors.Wrap(err, "decoding YAML")
}
localDef.setDefaults()

for pipelineName, pipelineDef := range localDef.Pipelines {
if p, exists := d.Pipelines[pipelineName]; exists {
Expand Down
56 changes: 52 additions & 4 deletions definition/pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,77 @@ package definition

import (
"strings"

"github.com/friendsofgo/errors"
)

type TaskDef struct {
Script []string `yaml:"script"`
DependsOn []string `yaml:"depends_on"`
AllowFailure bool `yaml:"allow_failure"`
// Script is a list of shell commands that are executed for this task
Script []string `yaml:"script"`
// DependsOn is a list of task names this task depends on (must be finished before it can start)
DependsOn []string `yaml:"depends_on"`
// AllowFailure should be set, if the pipeline should continue event if this task had an error
AllowFailure bool `yaml:"allow_failure"`
}

type PipelineDef struct {
Concurrent bool `yaml:"concurrent"`
// Concurrency declares how many instances of this pipeline are allowed to execute concurrently (defaults to 1)
Concurrency int `yaml:"concurrency"`
// QueueLimit is the number of slots for queueing jobs if the allowed concurrency is exceeded, defaults to unbounded (nil)
QueueLimit *int `yaml:"queue_limit"`
// QueueStrategy to use when adding jobs to the queue (defaults to append)
QueueStrategy QueueStrategy `yaml:"queue_strategy"`

Tasks map[string]TaskDef `yaml:"tasks"`

// SourcePath stores the source path where the pipeline was defined
SourcePath string
}

type QueueStrategy int

const (
// QueueStrategyAppend appends jobs to the queue until queue limit is reached
QueueStrategyAppend QueueStrategy = 0
// QueueStrategyReplace replaces pending jobs (with same variables) instead of appending to the queue
QueueStrategyReplace QueueStrategy = 1
)

func (s *QueueStrategy) UnmarshalYAML(unmarshal func(interface{}) error) error {
var strategyName string
err := unmarshal(&strategyName)
if err != nil {
return err
}

switch strategyName {
case "append":
*s = QueueStrategyAppend
case "replace":
*s = QueueStrategyReplace
default:
return errors.Errorf("unknown queue strategy: %q", strategyName)
}

return nil
}

type PipelinesMap map[string]PipelineDef

type PipelinesDef struct {
Pipelines PipelinesMap `yaml:"pipelines"`
}

func (d *PipelinesDef) setDefaults() {
for pipeline, pipelineDef := range d.Pipelines {
// Use concurrency of 1 by default (0 is zero value and makes no sense)
if pipelineDef.Concurrency == 0 {
pipelineDef.Concurrency = 1
d.Pipelines[pipeline] = pipelineDef
}
}
}

type KeyValue map[string]string

func (m PipelinesMap) NamesWithSourcePath() KeyValue {
Expand Down
30 changes: 30 additions & 0 deletions examples/pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,33 @@ pipelines:
no-work:
script:
- go for a walk

queue_it:
concurrency: 2
tasks:
lint:
script:
- echo "Starting something busy"
- sleep 3
- echo "25% done"
- sleep 2
- echo "50% done"
- sleep 4
- echo "75% done"
- sleep 1
- echo "100% done"

replace_it:
queue_strategy: replace
tasks:
lint:
script:
- echo "Starting something busy"
- sleep 2
- echo "25% done"
- sleep 3
- echo "50% done"
- sleep 4
- echo "75% done"
- sleep 2
- echo "100% done"
Loading

0 comments on commit 4e8cfc6

Please sign in to comment.