Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: update compute task workflow #365

Merged
merged 10 commits into from
Feb 13, 2024
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Rename `Function` addressable to `Archive` ([#288](https://github.com/Substra/orchestrator/pull/288))
- Renamed `compute_task_key`by `asset_key` in `FailureReport` ([#277](https://github.com/Substra/orchestrator/pull/277))
- `FailureReport` now can be reference a `ComputeTask` or a `Function` through `asset_key` + `asset_type` ([#277](https://github.com/Substra/orchestrator/pull/277))
- Logic to determine new compute task status takes in account the status of the function. A new task can now be created with the status `FAILED`or `CANCELLED` (if the function reached the corresponding status) ([#365](https://github.com/Substra/orchestrator/pull/365))
- BREAKING: Transition to status `TODO` for a given compute task is done after the function is built([#365](https://github.com/Substra/orchestrator/pull/365))
guilhem-barthes marked this conversation as resolved.
Show resolved Hide resolved

### Fixed

Expand Down
5 changes: 5 additions & 0 deletions e2e/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,11 @@ func (c *TestClient) SetReadyFunction(keyRef string) {
c.applyFunctionAction(keyRef, asset.FunctionAction_FUNCTION_ACTION_READY)
}

func (c *TestClient) SetReadyFromWaitingFunction(keyRef string) {
c.BuildFunction(keyRef)
c.SetReadyFunction(keyRef)
}

func (c *TestClient) applyFunctionAction(keyRef string, action asset.FunctionAction) {
functionKey := c.ks.GetKey(keyRef)
c.logger.Debug().Str("functionKey", functionKey).Str("action", action.String()).Msg("applying function action")
Expand Down
20 changes: 20 additions & 0 deletions e2e/computeplan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ func TestCancelComputePlan(t *testing.T) {
WithInput("local", &client.TaskOutputRef{TaskRef: "cmp2", Identifier: "local"}).
WithInput("shared", &client.TaskOutputRef{TaskRef: "agg1", Identifier: "model"}))

// Set the function to the right status.
appClient.SetReadyFromWaitingFunction("aggFunction")
appClient.SetReadyFromWaitingFunction("compFunction")

// We start processing the compute plan
appClient.StartTask("cmp1")
appClient.StartTask("cmp2")
Expand Down Expand Up @@ -182,6 +186,10 @@ func TestMultiStageComputePlan(t *testing.T) {
log.Fatal().Int32("rank", lastAggregate.Rank).Msg("last aggegation task has not expected rank")
}

// Set the function to the right status.
appClient.SetReadyFromWaitingFunction("functionComp")
appClient.SetReadyFromWaitingFunction("functionAgg")

// Start step 1
appClient.StartTask("compA1")
appClient.StartTask("compB1")
Expand Down Expand Up @@ -302,6 +310,7 @@ func TestBatchLargeComputePlan(t *testing.T) {
nbQuery := 5000 // 10k exceed max response size

appClient.RegisterFunction(client.DefaultSimpleFunctionOptions())
appClient.SetReadyFromWaitingFunction(client.DefaultSimpleFunctionRef)
appClient.RegisterDataManager(client.DefaultDataManagerOptions())
appClient.RegisterDataSample(client.DefaultDataSampleOptions())
appClient.RegisterComputePlan(client.DefaultComputePlanOptions())
Expand Down Expand Up @@ -385,6 +394,8 @@ func TestAggregateComposite(t *testing.T) {
WithInput("shared", &client.TaskOutputRef{TaskRef: "a1", Identifier: "model"}).
WithInput("local", &client.TaskOutputRef{TaskRef: "c1", Identifier: "local"}))

appClient.SetReadyFromWaitingFunction(client.DefaultCompositeFunctionRef)
appClient.SetReadyFromWaitingFunction("aggFunction")
appClient.StartTask("c1")
models := []*client.ModelOptions{
client.DefaultModelOptions().WithTaskRef("c1").WithKeyRef("m1H").WithTaskOutput("local"),
Expand Down Expand Up @@ -428,6 +439,9 @@ func TestFailLargeComputePlan(t *testing.T) {
appClient.RegisterDataSample(client.DefaultDataSampleOptions())
appClient.RegisterComputePlan(client.DefaultComputePlanOptions())

appClient.SetReadyFromWaitingFunction("functionComp")
appClient.SetReadyFromWaitingFunction("functionAgg")

newTasks := make([]client.Taskable, 0)
start := time.Now()
for i := 0; i < nbRounds; {
Expand Down Expand Up @@ -519,6 +533,7 @@ func TestGetComputePlan(t *testing.T) {
require.Nil(t, plan.FailureDate)
require.Nil(t, plan.CancelationDate)

appClient.SetReadyFromWaitingFunction(client.DefaultSimpleFunctionRef)
appClient.StartTask(client.DefaultTrainTaskRef)

appClient.FailTask("task#1")
Expand All @@ -545,6 +560,7 @@ func TestCompositeParentChild(t *testing.T) {
WithInput("local", &client.TaskOutputRef{TaskRef: "comp1", Identifier: "local"}).
WithInput("shared", &client.TaskOutputRef{TaskRef: "comp1", Identifier: "shared"}))

appClient.SetReadyFromWaitingFunction("functionComp")
appClient.StartTask("comp1")
appClient.RegisterModel(client.DefaultModelOptions().WithTaskRef("comp1").WithKeyRef("model1H").WithTaskOutput("local"))
appClient.RegisterModel(client.DefaultModelOptions().WithTaskRef("comp1").WithKeyRef("model1T").WithTaskOutput("shared"))
Expand Down Expand Up @@ -603,12 +619,15 @@ func TestDisableTransientOutput(t *testing.T) {
appClient := factory.NewTestClient()

appClient.RegisterFunction(client.DefaultSimpleFunctionOptions())
appClient.RegisterFunction(client.DefaultPredictFunctionOptions())
appClient.RegisterDataManager(client.DefaultDataManagerOptions())
appClient.RegisterDataSample(client.DefaultDataSampleOptions())
appClient.RegisterComputePlan(client.DefaultComputePlanOptions())
appClient.RegisterTasks(client.DefaultTrainTaskOptions().WithOutput("model", &asset.NewPermissions{Public: true}, true))
appClient.RegisterTasks(client.DefaultTrainTaskOptions().WithKeyRef("child1").WithInput("model", &client.TaskOutputRef{TaskRef: client.DefaultTrainTaskRef, Identifier: "model"}))

appClient.SetReadyFromWaitingFunction(client.DefaultSimpleFunctionRef)
appClient.SetReadyFromWaitingFunction(client.DefaultPredictFunctionRef)
// First task done
appClient.StartTask(client.DefaultTrainTaskRef)
appClient.RegisterModel(client.DefaultModelOptions().WithKeyRef("model0"))
Expand Down Expand Up @@ -647,6 +666,7 @@ func TestIsPlanRunning(t *testing.T) {
resp = appClient.IsPlanRunning(client.DefaultPlanRef)
require.True(t, resp.IsRunning)

appClient.SetReadyFromWaitingFunction(client.DefaultSimpleFunctionRef)
appClient.StartTask(client.DefaultTrainTaskRef)

resp = appClient.IsPlanRunning(client.DefaultPlanRef)
Expand Down
21 changes: 21 additions & 0 deletions e2e/computetask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func TestTrainTaskLifecycle(t *testing.T) {
appClient.RegisterTasks(client.DefaultTrainTaskOptions())
appClient.RegisterTasks(client.DefaultTrainTaskOptions().WithKeyRef("anotherTask").
WithInput("model", &client.TaskOutputRef{TaskRef: client.DefaultTrainTaskRef, Identifier: "model"}))
appClient.SetReadyFromWaitingFunction(client.DefaultSimpleFunctionRef)
appClient.StartTask(client.DefaultTrainTaskRef)
}

Expand All @@ -94,6 +95,10 @@ func TestPredictTaskLifecycle(t *testing.T) {
WithInput("predictions", &client.TaskOutputRef{TaskRef: "predict", Identifier: "predictions"}).
WithFunctionRef("metric"))

appClient.SetReadyFromWaitingFunction("train_function")
appClient.SetReadyFromWaitingFunction("predict_function")
appClient.SetReadyFromWaitingFunction("metric")

appClient.StartTask("train")
appClient.RegisterModel(client.DefaultModelOptions().WithTaskRef("train").WithKeyRef("train_end"))
appClient.DoneTask("train")
Expand Down Expand Up @@ -125,6 +130,7 @@ func TestCascadeCancel(t *testing.T) {
WithInput("model", &client.TaskOutputRef{TaskRef: client.DefaultTrainTaskRef, Identifier: "model"}))
}

appClient.SetReadyFromWaitingFunction(client.DefaultSimpleFunctionRef)
appClient.StartTask(client.DefaultTrainTaskRef)
appClient.CancelTask(client.DefaultTrainTaskRef)

Expand Down Expand Up @@ -153,6 +159,7 @@ func TestCascadeTodo(t *testing.T) {
WithInput("model", &client.TaskOutputRef{TaskRef: client.DefaultTrainTaskRef, Identifier: "model"}))
}

appClient.SetReadyFromWaitingFunction(client.DefaultSimpleFunctionRef)
appClient.StartTask(client.DefaultTrainTaskRef)
appClient.RegisterModel(client.DefaultModelOptions())
appClient.DoneTask(client.DefaultTrainTaskRef)
Expand Down Expand Up @@ -183,6 +190,7 @@ func TestCascadeFailure(t *testing.T) {
WithInput("model", &client.TaskOutputRef{TaskRef: client.DefaultTrainTaskRef, Identifier: "model"}))
}

appClient.SetReadyFromWaitingFunction(client.DefaultSimpleFunctionRef)
appClient.StartTask(client.DefaultTrainTaskRef)
appClient.FailTask(client.DefaultTrainTaskRef)

Expand Down Expand Up @@ -350,6 +358,9 @@ func TestQueryTaskInputs(t *testing.T) {

otherTaskOptions := client.DefaultTrainTaskOptions().WithKeyRef(otherTaskRef).WithPlanRef(cpRef)
appClient.RegisterTasks(otherTaskOptions)
appClient.SetReadyFromWaitingFunction(client.DefaultSimpleFunctionRef)
appClient.SetReadyFromWaitingFunction(parentFunctionRef)
appClient.SetReadyFromWaitingFunction(childFunctionRef)
appClient.StartTask(otherTaskRef)
appClient.RegisterModel(client.DefaultModelOptions().WithKeyRef(inputModelRef).WithTaskRef(otherTaskRef))

Expand Down Expand Up @@ -455,6 +466,8 @@ func TestEventsDuringComputeTaskLifecycle(t *testing.T) {
appClient.RegisterDataManager(client.DefaultDataManagerOptions())
appClient.RegisterDataSample(client.DefaultDataSampleOptions())
appClient.RegisterComputePlan(client.DefaultComputePlanOptions())

appClient.SetReadyFromWaitingFunction(client.DefaultSimpleFunctionRef)
registeredTask := appClient.RegisterTasks(client.DefaultTrainTaskOptions())[0]

getEventTask := func(eventKind asset.EventKind) *asset.ComputeTask {
Expand All @@ -464,6 +477,7 @@ func TestEventsDuringComputeTaskLifecycle(t *testing.T) {
EventKind: eventKind,
}, "", 100)

// One for the fr
require.Len(t, res.Events, 1)
return res.Events[0].GetComputeTask()
}
Expand Down Expand Up @@ -501,6 +515,8 @@ func TestWorkerCancelTaskInFailedComputePlan(t *testing.T) {
WithInput("model", &client.TaskOutputRef{TaskRef: "trainTask1", Identifier: "model"}).
WithWorker("MyOrg2MSP"))

client1.SetReadyFromWaitingFunction("trainFunction")
client1.SetReadyFromWaitingFunction("aggFunction")
client1.StartTask("trainTask1")
client1.RegisterModel(client.DefaultModelOptions().WithTaskRef("trainTask1"))

Expand All @@ -526,6 +542,8 @@ func TestGetTaskInputAssets(t *testing.T) {
client.DefaultTrainTaskOptions().WithKeyRef("train2").WithFunctionRef("train_function"), // Make sure we have several models as aggregate input to check order
)

appClient.SetReadyFromWaitingFunction("train_function")

// Check that train has expected inputs
trainInputs := appClient.GetTaskInputAssets("train")
assert.Len(t, trainInputs, 2)
Expand All @@ -550,6 +568,7 @@ func TestGetTaskInputAssets(t *testing.T) {
}).
WithKeyRef("aggregate_function"),
)
appClient.SetReadyFromWaitingFunction("aggregate_function")
appClient.RegisterTasks(
client.DefaultAggregateTaskOptions().
WithInput("sink", &client.TaskOutputRef{TaskRef: "train", Identifier: "model"}).
Expand Down Expand Up @@ -592,6 +611,7 @@ func TestGetTaskInputAssetsFromComposite(t *testing.T) {
appClient.RegisterDataManager(client.DefaultDataManagerOptions())
appClient.RegisterDataSample(client.DefaultDataSampleOptions())

appClient.SetReadyFromWaitingFunction("comp_function")
appClient.RegisterTasks(
client.DefaultCompositeTaskOptions().WithKeyRef("comp1").WithFunctionRef("comp_function"),
client.DefaultCompositeTaskOptions().WithKeyRef("comp2").WithFunctionRef("comp_function"),
Expand Down Expand Up @@ -621,6 +641,7 @@ func TestGetTaskInputAssetsFromComposite(t *testing.T) {
}).
WithKeyRef("aggregate_function"),
)
appClient.SetReadyFromWaitingFunction("aggregate_function")
appClient.RegisterTasks(
client.DefaultAggregateTaskOptions().
WithInput("sink", &client.TaskOutputRef{TaskRef: "comp1", Identifier: "shared"}).
Expand Down
3 changes: 3 additions & 0 deletions e2e/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func TestSubscribeReplayEvents(t *testing.T) {
appClient := factory.NewTestClient()

appClient.RegisterFunction(client.DefaultSimpleFunctionOptions())
appClient.SetReadyFromWaitingFunction(client.DefaultSimpleFunctionRef)
appClient.RegisterDataManager(client.DefaultDataManagerOptions())
appClient.RegisterDataSample(client.DefaultDataSampleOptions())
plan := appClient.RegisterComputePlan(client.DefaultComputePlanOptions())
Expand Down Expand Up @@ -89,6 +90,7 @@ func TestSubscribeEventsEmittedWhileSubscribed(t *testing.T) {
appClient := factory.NewTestClient()

appClient.RegisterFunction(client.DefaultSimpleFunctionOptions())
appClient.SetReadyFromWaitingFunction(client.DefaultSimpleFunctionRef)
appClient.RegisterDataManager(client.DefaultDataManagerOptions())
appClient.RegisterDataSample(client.DefaultDataSampleOptions())
plan := appClient.RegisterComputePlan(client.DefaultComputePlanOptions())
Expand All @@ -115,6 +117,7 @@ func TestSubscribeEventsEmittedWhileSubscribed(t *testing.T) {
func TestSubscribeReplayThenListen(t *testing.T) {
appClient := factory.NewTestClient()
appClient.RegisterFunction(client.DefaultSimpleFunctionOptions())
appClient.SetReadyFromWaitingFunction(client.DefaultSimpleFunctionRef)
manager := appClient.RegisterDataManager(client.DefaultDataManagerOptions())

replayedSamples := make([]*asset.DataSample, 5)
Expand Down
1 change: 1 addition & 0 deletions e2e/failure_report_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func TestRegisterFailureReport(t *testing.T) {
appClient.RegisterComputePlan(client.DefaultComputePlanOptions())
appClient.RegisterTasks(client.DefaultTrainTaskOptions())

appClient.SetReadyFromWaitingFunction(client.DefaultSimpleFunctionRef)
appClient.StartTask(client.DefaultTrainTaskRef)

registeredFailureReport := appClient.RegisterTaskFailureReport(client.DefaultTrainTaskRef)
Expand Down
4 changes: 4 additions & 0 deletions e2e/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ func TestRegisterModel(t *testing.T) {
appClient := factory.NewTestClient()

appClient.RegisterFunction(client.DefaultSimpleFunctionOptions())
appClient.SetReadyFromWaitingFunction(client.DefaultSimpleFunctionRef)

appClient.RegisterDataManager(client.DefaultDataManagerOptions())
appClient.RegisterDataSample(client.DefaultDataSampleOptions())
appClient.RegisterComputePlan(client.DefaultComputePlanOptions())
Expand Down Expand Up @@ -56,6 +58,7 @@ func TestRegisterTwoSimpleModelsForTrainTask(t *testing.T) {
appClient.RegisterComputePlan(client.DefaultComputePlanOptions())
appClient.RegisterTasks(client.DefaultTrainTaskOptions())

appClient.SetReadyFromWaitingFunction(client.DefaultSimpleFunctionRef)
appClient.StartTask(client.DefaultTrainTaskRef)
_, err := appClient.FailableRegisterModels(
client.DefaultModelOptions().WithKeyRef("mod1"),
Expand All @@ -75,6 +78,7 @@ func TestRegisterAllModelsForCompositeTask(t *testing.T) {
appClient.RegisterComputePlan(client.DefaultComputePlanOptions())
appClient.RegisterTasks(client.DefaultCompositeTaskOptions())

appClient.SetReadyFromWaitingFunction(client.DefaultCompositeFunctionRef)
appClient.StartTask(client.DefaultCompositeTaskRef)
appClient.RegisterModels(
client.DefaultModelOptions().WithTaskRef(client.DefaultCompositeTaskRef).WithKeyRef("mod1").WithTaskOutput("local"),
Expand Down
12 changes: 12 additions & 0 deletions e2e/performance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ func TestRegisterPerformance(t *testing.T) {
appClient := factory.NewTestClient()

appClient.RegisterFunction(client.DefaultSimpleFunctionOptions())
appClient.SetReadyFromWaitingFunction(client.DefaultSimpleFunctionRef)
appClient.RegisterDataManager(client.DefaultDataManagerOptions())
appClient.RegisterDataSample(client.DefaultDataSampleOptions())
appClient.RegisterDataSample(client.DefaultDataSampleOptions().WithKeyRef("testds"))
Expand All @@ -29,6 +30,7 @@ func TestRegisterPerformance(t *testing.T) {
appClient.DoneTask(client.DefaultTrainTaskRef)

appClient.RegisterFunction(client.DefaultPredictFunctionOptions())
appClient.SetReadyFromWaitingFunction(client.DefaultPredictFunctionRef)

appClient.RegisterTasks(client.DefaultPredictTaskOptions().
WithKeyRef("predictTask").
Expand All @@ -39,6 +41,7 @@ func TestRegisterPerformance(t *testing.T) {
appClient.DoneTask("predictTask")

appClient.RegisterFunction(client.DefaultMetricFunctionOptions().WithKeyRef("testmetric"))
appClient.SetReadyFromWaitingFunction("testmetric")
appClient.RegisterTasks(client.DefaultTestTaskOptions().
WithKeyRef("testTask").
WithFunctionRef("testmetric").
Expand Down Expand Up @@ -76,6 +79,7 @@ func TestRegisterMultiplePerformances(t *testing.T) {
appClient := factory.NewTestClient()

appClient.RegisterFunction(client.DefaultSimpleFunctionOptions())
appClient.SetReadyFromWaitingFunction(client.DefaultSimpleFunctionRef)
appClient.RegisterDataManager(client.DefaultDataManagerOptions())
appClient.RegisterDataSample(client.DefaultDataSampleOptions())
appClient.RegisterDataSample(client.DefaultDataSampleOptions().WithKeyRef("testds"))
Expand All @@ -86,6 +90,7 @@ func TestRegisterMultiplePerformances(t *testing.T) {
appClient.DoneTask(client.DefaultTrainTaskRef)

appClient.RegisterFunction(client.DefaultPredictFunctionOptions())
appClient.SetReadyFromWaitingFunction(client.DefaultPredictFunctionRef)
appClient.RegisterTasks(client.DefaultPredictTaskOptions().
WithKeyRef("predictTask").
WithDataSampleRef("testds").
Expand All @@ -95,6 +100,7 @@ func TestRegisterMultiplePerformances(t *testing.T) {
appClient.DoneTask("predictTask")

appClient.RegisterFunction(client.DefaultMetricFunctionOptions().WithKeyRef("testmetric"))
appClient.SetReadyFromWaitingFunction("testmetric")
appClient.RegisterTasks(client.DefaultTestTaskOptions().
WithKeyRef("testTask").
WithDataSampleRef("testds").
Expand All @@ -114,6 +120,7 @@ func TestRegisterMultiplePerformancesForSameTaskOutput(t *testing.T) {
appClient := factory.NewTestClient()

appClient.RegisterFunction(client.DefaultSimpleFunctionOptions())
appClient.SetReadyFromWaitingFunction(client.DefaultSimpleFunctionRef)
appClient.RegisterDataManager(client.DefaultDataManagerOptions())
appClient.RegisterDataSample(client.DefaultDataSampleOptions())
appClient.RegisterDataSample(client.DefaultDataSampleOptions().WithKeyRef("testds"))
Expand All @@ -124,6 +131,7 @@ func TestRegisterMultiplePerformancesForSameTaskOutput(t *testing.T) {
appClient.DoneTask(client.DefaultTrainTaskRef)

appClient.RegisterFunction(client.DefaultPredictFunctionOptions())
appClient.SetReadyFromWaitingFunction(client.DefaultPredictFunctionRef)

appClient.RegisterTasks(client.DefaultPredictTaskOptions().
WithKeyRef("predictTask").
Expand All @@ -134,6 +142,7 @@ func TestRegisterMultiplePerformancesForSameTaskOutput(t *testing.T) {
appClient.DoneTask("predictTask")

appClient.RegisterFunction(client.DefaultMetricFunctionOptions().WithKeyRef("testmetric"))
appClient.SetReadyFromWaitingFunction("testmetric")
appClient.RegisterTasks(client.DefaultTestTaskOptions().
WithKeyRef("testTask").
WithFunctionRef("testmetric").
Expand All @@ -159,6 +168,7 @@ func TestQueryPerformances(t *testing.T) {
appClient := factory.NewTestClient()

appClient.RegisterFunction(client.DefaultSimpleFunctionOptions())
appClient.SetReadyFromWaitingFunction(client.DefaultSimpleFunctionRef)
appClient.RegisterDataManager(client.DefaultDataManagerOptions())
appClient.RegisterDataSample(client.DefaultDataSampleOptions())
appClient.RegisterDataSample(client.DefaultDataSampleOptions().WithKeyRef("testds"))
Expand All @@ -169,6 +179,7 @@ func TestQueryPerformances(t *testing.T) {
appClient.DoneTask(client.DefaultTrainTaskRef)

appClient.RegisterFunction(client.DefaultPredictFunctionOptions().WithKeyRef("predictFunction"))
appClient.SetReadyFromWaitingFunction("predictFunction")
appClient.RegisterTasks(client.DefaultPredictTaskOptions().
WithKeyRef("predictTask").
WithDataSampleRef("testds").
Expand All @@ -179,6 +190,7 @@ func TestQueryPerformances(t *testing.T) {
appClient.DoneTask("predictTask")

appClient.RegisterFunction(client.DefaultMetricFunctionOptions().WithKeyRef("testmetric"))
appClient.SetReadyFromWaitingFunction("testmetric")
appClient.RegisterTasks(client.DefaultTestTaskOptions().
WithKeyRef("testTask").
WithDataSampleRef("testds").
Expand Down
2 changes: 1 addition & 1 deletion lib/persistence/computetask_dbal.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type ComputeTaskDBAL interface {
// GetComputePlanTasks returns the tasks of the compute plan identified by the given key
GetComputePlanTasks(key string) ([]*asset.ComputeTask, error)
GetComputePlanTasksKeys(key string) ([]string, error)
GetFunctionRunnableTasks(key string) ([]*asset.ComputeTask, error)
GetFunctionFromTasksWithStatus(key string, statuses []asset.ComputeTaskStatus) ([]*asset.ComputeTask, error)
AddComputeTaskOutputAsset(output *asset.ComputeTaskOutputAsset) error
// CountComputeTaskRegisteredOutputs returns the number of registered outputs by identifier
CountComputeTaskRegisteredOutputs(key string) (ComputeTaskOutputCounter, error)
Expand Down
Loading
Loading