Skip to content

Commit

Permalink
refactor to use version in workflow spec
Browse files Browse the repository at this point in the history
  • Loading branch information
mfleader committed Oct 16, 2023
1 parent 263cfd1 commit 4876b17
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 44 deletions.
39 changes: 15 additions & 24 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ import (
"go.flow.arcalot.io/pluginsdk/schema"
)

var supportedVersions = map[string]struct{}{
"v0.1.0": {},
}

// WorkflowEngine is responsible for executing workflows and returning their result.
type WorkflowEngine interface {
// RunWorkflow is a simplified shortcut to parse and immediately run a workflow.
Expand Down Expand Up @@ -86,11 +90,11 @@ func (w workflowEngine) Parse(
return nil, err
}

v, err := SupportedApiVersion(wf.ApiVersion)
v, err := SupportedVersion(wf.Version)
if err != nil {
return nil, err
}
wf.ApiVersion = v
wf.Version = v

executor, err := workflow.NewExecutor(w.logger, w.config, w.stepRegistry)
if err != nil {
Expand All @@ -107,30 +111,17 @@ func (w workflowEngine) Parse(
}, nil
}

var supportedApiVersions = map[string]struct{}{
"0.1.0": struct{}{},
}

func SupportedApiVersion(apiVersion string) (string, error) {
//v, err := semver.NewVersion(apiVersion)
//if err != nil {
// return apiVersion, fmt.Errorf("invalid semantic versioning of apiVersion: %s", apiVersion)
//}
//_, ok := supportedApiVersions[v.String()]
//
//if !ok {
// return apiVersion, fmt.Errorf("unsupported workflow schema apiVersion: %s", apiVersion)
//}
//return v.String(), nil

// schema validation already covers regex for x.y.z format

_, ok := supportedApiVersions[apiVersion]

// SupportedVersion confirms whether a given version string
// is in the set of supported workflow specifications. It
// returns true when the version is in the set, false otherwise.
// Earlier schema validation already applies version's
// regular expression.
func SupportedVersion(version string) (string, error) {
_, ok := supportedVersions[version]
if !ok {
return apiVersion, fmt.Errorf("unsupported workflow schema apiVersion: %s", apiVersion)
return version, fmt.Errorf("unsupported workflow schema version: %s", version)
}
return apiVersion, nil
return version, nil
}

type engineWorkflow struct {
Expand Down
17 changes: 8 additions & 9 deletions engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@ import (
"go.flow.arcalot.io/engine/config"
)

func TestEngineWorkflow_ParseApiVersion(t *testing.T) {

_, err := engine.SupportedApiVersion("0.1.0")
func TestEngineWorkflow_ParseVersion(t *testing.T) {
_, err := engine.SupportedVersion("v0.1.0")
assert.NoError(t, err)

_, err = engine.SupportedApiVersion("0.2.0")
_, err = engine.SupportedVersion("v0.11.0")
assert.Error(t, err)
}

Expand Down Expand Up @@ -104,7 +102,7 @@ func TestEmptySteps(t *testing.T) {
context.Background(),
nil,
map[string][]byte{
"workflow.yaml": []byte(`apiVersion: 0.1.0
"workflow.yaml": []byte(`version: v0.1.0
output: []
steps: []`),
},
Expand All @@ -119,7 +117,8 @@ func TestNoSteps(t *testing.T) {
context.Background(),
nil,
map[string][]byte{
"workflow.yaml": []byte(`output: []`),
"workflow.yaml": []byte(`version: v0.1.0
output: []`),
},
"",
)
Expand All @@ -132,7 +131,7 @@ func TestE2E(t *testing.T) {
context.Background(),
[]byte(`name: Arca Lot`),
map[string][]byte{
"workflow.yaml": []byte(`apiVersion: 0.2.0
"workflow.yaml": []byte(`version: v0.1.0
input:
root: RootObject
objects:
Expand Down Expand Up @@ -163,7 +162,7 @@ func TestE2EMultipleOutputs(t *testing.T) {
context.Background(),
[]byte(`name: Arca Lot`),
map[string][]byte{
"workflow.yaml": []byte(`apiVersion: 0.1.0
"workflow.yaml": []byte(`version: v0.1.0
input:
root: RootObject
objects:
Expand Down
2 changes: 1 addition & 1 deletion workflow/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

var sharedInputWorkflowYAML = `---
apiVersion: 0.1.0
version: v0.1.0
input:
root: RootObject
objects:
Expand Down
12 changes: 6 additions & 6 deletions workflow/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (

// Workflow is the primary data structure describing workflows.
type Workflow struct {
// ApiVersion determines which set of the arcaflow workflow external interface will be used in the workflow.
ApiVersion string `json:"apiVersion""`
// Version determines which set of the arcaflow workflow external interface will be used in the workflow.
Version string `json:"version"`
// Input describe the input schema for a workflow. These values can be referenced from expressions. The structure
// must be a scope described in primitive types. This is done so later on a forward reference to a step input can
// be used.
Expand Down Expand Up @@ -41,14 +41,14 @@ func getSchema() *schema.TypedScopeSchema[*Workflow] {
schema.NewStructMappedObjectSchema[*Workflow](
"Workflow",
map[string]*schema.PropertySchema{
"apiVersion": schema.NewPropertySchema(
"version": schema.NewPropertySchema(
schema.NewStringSchema(
schema.IntPointer(1),
schema.IntPointer(255),
regexp.MustCompile("^\\d+\\.\\d+\\.\\d+$")),
regexp.MustCompile(`^v\d+\.\d+\.\d+$`)),
schema.NewDisplayValue(
schema.PointerTo("ApiVersion"),
schema.PointerTo("Arcaflow Workflow Schema definition interface to be used."),
schema.PointerTo("Version"),
schema.PointerTo("Arcaflow Workflow specification version to be used."),
nil,
),
true,
Expand Down
8 changes: 4 additions & 4 deletions workflow/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
)

var badWorkflowDefinition = `
apiVersion: 0.1.0
version: v0.1.0
input:
root: name
objects:
Expand Down Expand Up @@ -71,7 +71,7 @@ func TestOutputFailed(t *testing.T) {
}

var stepCancellationWorkflowDefinition = `
apiVersion: 0.1.0
version: v0.1.0
input:
root: RootObject
objects:
Expand Down Expand Up @@ -150,7 +150,7 @@ func TestStepCancellation(t *testing.T) {
}

var waitForSerialWorkflowDefinition = `
apiVersion: 0.1.0
version: v0.1.0
input:
root: RootObject
objects:
Expand Down Expand Up @@ -231,7 +231,7 @@ func TestWaitForSerial(t *testing.T) {
// Running parallel steps which wait on the same previous step sometimes causes a race condition. This needs to be investigated.
// once the race condition if fixed reduce the wait_time to 500ms.
var waitForParallelWorkflowDefinition = `
apiVersion: 0.1.0
version: v0.1.0
input:
root: RootObject
objects:
Expand Down

0 comments on commit 4876b17

Please sign in to comment.