Skip to content

Commit

Permalink
Feature / Runtime child jobs, parallel and sequential job groups (#465)
Browse files Browse the repository at this point in the history
* Suggested metadata for handling sequential and parallel job groups

* Make graph builder hold stateful information on job resources, namespace etc

* Use EJobValidation for errors in the graph builder

* Make the graph builder try to continue after it hits an error

* Add graph definitions, functions and builder logic to express child jobs

* Engine updates to handle processing for child jobs

* Update dev mode translator to handle job recursion (resources are stateful, job def is stateless)

* Update code that depends on the new dev mode translator

* Create an example import -> process -> export job group

* Fix one warning in the graph module

* Add some extra paths to git ignore for example data

* Add the import -> process -> export group job to the CI job for example models

* Minor fixes and tidy-ups

* Remove an unneeded code change
  • Loading branch information
Martin Traverse authored Oct 31, 2024
1 parent c6cd697 commit 0cc91e9
Show file tree
Hide file tree
Showing 13 changed files with 922 additions and 404 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@

/examples/models/python/data/outputs/**
/examples/models/python/data/data/**
/examples/models/python/data/primary/**
/examples/models/python/data/generated/**
/examples/models/python/data/exports/**
!/examples/models/python/data/exports/.keep
/examples/apps/javascript/node_modules/**
Expand Down
49 changes: 49 additions & 0 deletions examples/models/python/config/job_group.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@


job:
jobGroup:
sequential:
jobs:

- importData:

model: tutorial.data_import.SimpleDataImport

parameters:
storage_key: staging_data
source_file: sample_data.parquet

outputs:
customer_loans: primary/data_import/customer_loans.csv

storageAccess:
- staging_data

- runModel:

model: tutorial.using_data.UsingDataModel

parameters:
eur_usd_rate: 1.2071
default_weighting: 1.5
filter_defaults: false

inputs:
customer_loans: primary/data_import/customer_loans.csv

outputs:
profit_by_region: generated/using_data/profit_by_region.csv

- exportData:

model: tutorial.data_export.DataExportExample

parameters:
storage_key: exported_data
export_comment: "Exporting some example data"

inputs:
profit_by_region: generated/using_data/profit_by_region.csv

storageAccess:
- exported_data
30 changes: 30 additions & 0 deletions examples/models/python/src/tutorial/data_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import pandas as pd
import pytz

import tutorial.schemas as schemas


class BulkDataImport(trac.TracDataImport):

Expand Down Expand Up @@ -149,6 +151,34 @@ def run_model(self, ctx: trac.TracDataContext):

ctx.log().warning(f"Requested table [{table_name}] not found in storage [{storage_key}]")


class SimpleDataImport(trac.TracDataImport):

def define_parameters(self) -> tp.Dict[str, trac.ModelParameter]:

return trac.define_parameters(
trac.P("storage_key", trac.STRING, "TRAC external storage key"),
trac.P("source_file", trac.STRING, "Path of the source data file in external storage"))

def define_outputs(self) -> tp.Dict[str, trac.ModelOutputSchema]:

customer_loans = trac.load_schema(schemas, "customer_loans.csv")

return {"customer_loans": trac.ModelOutputSchema(customer_loans)}

def run_model(self, ctx: trac.TracDataContext):

storage_key = ctx.get_parameter("storage_key")
storage = ctx.get_file_storage(storage_key)

storage_file = ctx.get_parameter("source_file")

with storage.read_byte_stream(storage_file) as file_stream:

dataset = pd.read_parquet(file_stream)
ctx.put_pandas_table("customer_loans", dataset)


if __name__ == "__main__":
import tracdap.rt.launch as launch
launch.launch_model(BulkDataImport, "config/data_import.yaml", "config/sys_config.yaml")
17 changes: 17 additions & 0 deletions examples/models/python/src/tutorial/job_group.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Copyright 2024 Accenture Global Solutions Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import tracdap.rt.launch as launch

launch.launch_job("config/job_group.yaml", "config/sys_config.yaml", dev_mode=True)
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ enum JobType {

/// Export data to external locations
EXPORT_DATA = 5;

/// A job built from a collection of other jobs
JOB_GROUP = 6;
}

/**
Expand Down Expand Up @@ -98,6 +101,7 @@ message JobDefinition {
ImportModelJob importModel = 4;
ImportDataJob importData = 5;
ExportDataJob exportData = 6;
JobGroup jobGroup = 7;
}
}

Expand Down Expand Up @@ -187,3 +191,44 @@ message ExportDataJob {

repeated TagUpdate outputAttrs = 8;
}


/**
* Specify the group type for a JOB_GROUP job
*/
enum JobGroupType {
JOB_GROUP_TYPE_NOT_SET = 0;
SEQUENTIAL_JOB_GROUP = 1;
PARALLEL_JOB_GROUP = 2;
}

/**
* Specification for a JOB_GROUP job, which runs a collection of other jobs
*/
message JobGroup {

JobGroupType jobGroupType = 1;

oneof jobGroupDetails {
SequentialJobGroup sequential = 2;
ParallelJobGroup parallel = 3;
}
}

/**
* A job group where each job runs in sequence
*/
message SequentialJobGroup {

repeated JobDefinition jobs = 1;
}

/**
* A job group where all jobs runs in parallel
*/
message ParallelJobGroup {

repeated JobDefinition jobs = 1;
}


Loading

0 comments on commit 0cc91e9

Please sign in to comment.