Skip to content

Latest commit

 

History

History

workflow

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 

Workflow

Introduction

Workflow an abstraction to define a set actions and tasks.

diagram

Task an abstraction to logically group one or more action, for example, init,test.

Action an abstraction defining a call to a service. An action does actual job, like starting service, building and deploying app etc,

ActionRequest an abstraction representing a service request.

ActionResponse an abstraction representing a service response.

To execute action:

  1. workflow service looks up a service by id, in workflow manager registry.
  2. workflow service creates a new request for corresponding action on the selected service.
  3. Action.Request is expanded with context.State ($variable substitution) and converted as service request struct.
  4. Service executes operation for provided request.

Service an abstraction providing set of capabilities triggered by specified action/request.

To list endly supported services run the following:

endly -s='*'

To list supported services actions run the following endly -s=[service name]:

i.e

endly -s='storage' 

To list request/response contract for a service action run the following endly -s=[service name] -a=[action]:

i.e

endly -s='storage' -a='copy' 

State key/value pair map that is used to mange state during the workflow run. The state can in init, or post action,task or workflow node.

State is a data substitution source with rich expression language

The workflow content, data structures, can use dollar '$' sign followed by variable name to get its expanded to its corresponding state value if the key has been present.

Format

Inline Workflow

For simple sequential tasks, workflow can be defined inline with pipeline run request.

i. e.

@data.yaml

defaults:
  datastore: db1
pipeline:
  register:
    action: dsunit:register
    datastore: db1
    config:
      driverName: postgres
      descriptor: host=127.0.0.1 port=5432 user=[username] password=[password] dbname=[dbname]
        sslmode=disable
      credentials: $pgCredentials
      parameters:
        dbname: db1
  prepare:
    mapping:
      action: dsunit.mapping
      mappings:
      - URL: regression/db1/mapping.json
      post:
        tables: $Tables
    sequence:
      action: dsunit.sequence
      tables: $tables
      post:
      - seq = $Sequences
    data:
      action: nop
      init:
      - key = data.db.setup
      - dbSetup = $AsTableRecords($key)
    setup:
      action: dsunit:prepare
      URL: regression/db1/data/
      data: $dbSetup

Printing workflow model representation

endly -r=name -p   -f=yaml|json

Workflow data flow

Workflow arguments

For sake of illustrating data flow, let assume p1 and p2 parameters are supplied to workflow. These can be accessed within workflow or its tasks or actions vi the following:

  • $params.p1
  • $params.p2

A test workflow can be invoked by one of the following methods:

  1. Command line:
endly -w=test p1=val1 p2=val2
  1. Single workflow run request
endly -r=run

@run.yaml

Name: test
Params:
  p1: val1
  p2: val2
  1. Inline workflow run request:
 endly -r=run p2=val2

@run.yaml

params:
  p1: val1
pipeline:
  task1:
    action: print
    message: $params.p1 $params.p2
  task2:
    workflow: test
    p1: $params.p1
    p2: $params.p2  

Workflow process state

Workflow process uses context.State() to maintain execution state.

Variables an abstraction having capabilities to change a workflow state.

A workflow variable defines data transition between input and output state map.

In most cases input and output state is the same underlying map stored in context.State().

In the following cases input and output state refer to different maps: - post action execution - input state map is build from actual action response i.e http send response - output is context.State() - post workflow execution - input state map is context.State() - output is workflow.RunResponse.Data map

Workflow context.State() is shared between all sub workflows if SharedStateMode is set in workflow.RunRequest. This flag is set by default to all inline workflow invocation.

In the inline workflow you can use define variables in the 'init' section

@var.yaml

pipeline:
 task1:
   init:
     - '!var1 = $params.greeting'
     - var2 = world
     - name: var3
       value:
         - 1
         - 2
     - var4 = $Len($var3) > 0 ? var3.length is $Len($var3) : nil
   action: print
   message: $var1 $var2 $var3 $var4
 task2:
   init:
     var0: abc
     varSlice:
       - 1
       - 2
       - 3   
     varMap:
        k1: v1
        k2: $var0
        k3: $varSlice
   action: print
   message: $varMap 
endly -r=test_var p1=hello 

Inline variables:

You can inline variable by simply using '$' followed by variable name, if variable is surrounded with textual data or uses sub variable use {} to enclose it like the following examples:

  • some text${variable}abc
  • ${array[${i}]}
  • ${array[${i}].id}
  • xx${array[${i}].id}yy

if value of variable is a function you can use the $name.xx, where xx is argument passed to a function

  • ${uuid.next}, ${uuid.value}

The following predefined in context.go variables are function:

  • env: return environment variable i.e ${env.HOME}
  • uuid: return UUID previously generated or next instance
  • timestamp: return timestamp in ms for any expression likes ${timestamp.now} or ${timestamp.5hoursAgo}, etc ...
  • unix: return timestamp in sec for any expression likes ${unix.tomorrow} or ${unix.5daysAhead}, etc ...
  • tzTime: return formatted time with time.RFC3339 yyyy-MM-ddThh:mm:ss.SSS Z i.e ${tzTime.4daysAgoInUTC}
  • weekday: returns weekday with specified timezone i.e ${weekday.UTC}

For more advanced usage you can also delegate variable declaration to a separate JSON file

i.e:

@var.json

[

  {
    "Name": "catalinaOpts",
    "From": "params.catalinaOpts",
    "Value": "-Xms512m -Xmx1g -XX:MaxPermSize=256m"
  },
  {
     "Name": "buildRequest",
     "Value": {
       "BuildSpec": {
         "Name": "maven",
         "Version":"$mavenVersion",
         "Goal": "build",
         "BuildGoal": "$buildGoal",
         "Args": "$buildArgs",
         "Sdk": "jdk",
         "SdkVersion": "$jdkVersion"
       },
       "Target": "$buildTarget"
     }
   }
]

Variable has the following attributes

  • Name: name can be defined as key to be stored in state map or expression

    • array element push ->, for instance ->collection, where collection is a key in the state map
    • reference $ for example $ref, where ref is the key in the state, in this case the value will be
  • Value: any type value that is used when from value is empty

  • From name of a key state key, or expression with key.

  • When criteria if specified this variable will be set only if evaluated criteria is true (it can use $in, and $out state variables)

  • Required flag that validates that from returns non empty value or error is generated

  • Replace replacements map, if specified substitute variable value with corresponding value.

The following expression are supported:

  • number increments ++, for example counter++, where counter is a key in the state
  • array element shift <-, for example <-collection, where collection is a key in the state
  • reference $ for example $ref, where ref is the key in the state, in this case the value will be
  • evaluated as value stored in key pointed by content of ref variable
  • embedding UDF

Variable in actions:

Operation Variable.Name Variable.Value Variable.From Input State Before Input State After Out State Before Out State After
Assignment key1 [1,2,3] n/a n/a n/a { } {"key1":[1,2,3]}
Assignment by reference $key1 1 n/a {"key1":"a"} n/a { } {"a":1}
Assignment key1 n/a params.k1 {"params":{"k1":100}} n/a { } {"key1":100}
Assignment by reference key1 n/a $k {"k":"a", "a":100} n/a { } {"key1":100}
Push ->key1 1 n/a n/a n/a { } {"key1":[1]}
Push ->key1 2 n/a n/a n/a {"key1":[1]} {"key1":[1,2]}
Shift item n/a <-key1 n/a n/a {"key1":[1, 2]} {"key1":[2], "item":1}
Pre increment key n/a ++i {"i":100} {"i":101} {} {"key":101} }
Post increment key n/a i++ {"i":100} {"i":101} {} {"key":100} }

Workflow execution control:

By default, workflow run all specified task, and subtask with sync actions sequentially. All async action are executed independently, task completes when all actions execution is completed.

Each action can control its execution with

Action level criteria control

Each action has the following fields supports conditional expression to control workflow execution

  1. When: criteria to check if an action is eligible to run
  2. Skip: criteria to check if the whole group of actions by TagID can be skipped, continuing execution to next group
  3. Repeater control
    type Repeater struct {
    	Extracts     Extracts //textual regexp based data extraction
    	Variables    Variables       //structure data based data extraction
    	Repeat       int             //how many time send this request
    	SleepTimeMs  int             //Sleep time after request send, this only makes sense with repeat option
    	Exit string          //Repeat exit criteria, it uses extracted variable to determine repeat termination 
    }

Workflow goto task action Workflow goto action terminates current task actions execution to start specified current workflow task.`

Workflow switch action Workflow switch action enables to branch execution based on specified context.state key value. Note that switch does not terminate next actions within current task.

Error handling If there is an error during workflow execution, it fails immediately unless OnErrorTask is defined to catch and handle an error. In addition, error key is placed into the config with the following content:

type WorkflowError struct {
	Error        string
	WorkflowName string
	TaskName     string
	Activity     *WorkflowServiceActivity
}

Finally Workflow also offers DeferTask to execute as the last workflow step in case there is an error or not, for instance, to clean up a resource.

Workflow Lifecycle

  1. New context with a new state map is created after inheriting values from a caller. (Caller will not see any state changes from downstream workflow)
  2. data key is published to the context state with defined workflow.data. Workflow data field would stores complex nested data structure like a setup data.
  3. params key is published to state map with the caller parameters
  4. Workflow initialization stage executes, applying variables defined in Workflow.Pre (input: workflow state, output: workflow state)
  5. Tasks Execution
    1. Task eligibility determination:

      1. If specified tasks are '*' or empty, all task defined in the workflow will run sequentially, otherwise only specified
      2. Evaluate When if specified
    2. Task initialization stage executes, applying variables defined in Task.Pre (input: workflow state, output: workflow state)

    3. Executes all eligible actions:

      1. Action eligibility determination:
        1. Evaluate When if specified, or Skip for all the actions within the same neatly TagID (tag + Group + Index + Subpath)
      2. Action initialization stage executes, applying variables defined in Action.Pre (input: workflow state, output: workflow state)
      3. Executing action on specified service
      4. Action post stage executes applying variables defined in Action.Post (input: action.response, output: workflow state) response converted to map is also published to workflow state under key defined by COALESCE(action.Name, action.Action)
    4. Task post stage executes, applying variables defined in Task.Post (input: state, output: state)

  6. Workflow post stage executes, applying variables defined in Workflow.Post (input: workflow state, output: workflow.response)
  7. Context state comes with the following build-in/reserved keys:
    • rand - random int64
    • date - current date formatted as yyyy-MM-dd
    • time - current time formatted as yyyy-MM-dd hh:mm:ss
    • ts - current timestamp formatted as yyyyMMddhhmmSSS
    • timestamp.XXX - timestamp in ms where XXX is time diff expression i.e 3DaysAgo, tomorrow, hourAhead
    • unix.XXX - timestamp in sec where XXX is time diff expression i.e 3DaysAgo, tomorrow, hourAhead
    • tzTime.XXX - RFC3339 formatted time where XXX is time diff expression i.e 3DaysAgo, tomorrow, hourAhead
    • elapsedToday.locale i.e. : ${elapsedToday.UTC}
    • remainingToday.locale i.e. : ${remainingToday.Poland}
    • tmpDir - temp directory
    • uuid.next - generate unique id
    • uuid.Get - returns previously generated unique id, or generate new
    • env.XXX where XXX is the ID of the env variable to return
    • registered user defined function UDFs

Best Practice

  1. Delegate a new workflow request to dedicated req/ folder
  2. Variables controlling workflow state: Init, Post should only define state, if you decide to delegate then to external file use var/ folder
  3. Flag variable as Required or provide a fallback Value when applicable.
  4. Group similar functionally tasks into a reusable workflow.
  5. For complex workflow like regression consider using the following

Here is an example directory layout.


      endly
        |- run.yaml
        |- system.yaml              
        |- app.yaml
        |- datastore.yaml
        |
        |- regression /
        |       | - regression[.csv|.yaml]
        |       | - var/init.json (workflow init variables)
        |       | - <use_case_group1> / 1 ... 00X (Tag Iterator)/ <test assets>
        |       | 
        |       | - <use_case_groupN> / 1 ... 00Y (Tag Iterator)/ <test assets>
        | - config /
        |       
        | - datastore / db name 
                         | - dictionary /
                         | - schema.ddl