Skip to content

Commit

Permalink
Update unit tests for task and node execution counts
Browse files Browse the repository at this point in the history
  • Loading branch information
sshardool committed Mar 11, 2024
1 parent f9864e7 commit 1254afa
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 11 deletions.
128 changes: 128 additions & 0 deletions flytepropeller/pkg/controller/executors/mocks/execution_context.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions flytepropeller/pkg/controller/nodes/array/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ func createNodeExecutionContext(dataStore *storage.DataStore, eventRecorder inte
},
nil,
)
executionContext.OnIncrementNodeExecutionCount().Return(1)
executionContext.OnIncrementTaskExecutionCount().Return(1)
executionContext.OnCurrentNodeExecutionCount().Return(1)
executionContext.OnCurrentTaskExecutionCount().Return(1)
nCtx.OnExecutionContext().Return(executionContext)

// EventsRecorder
Expand Down
4 changes: 2 additions & 2 deletions flytepropeller/pkg/controller/nodes/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (c *recursiveNodeExecutor) RecursiveNodeHandler(ctx context.Context, execCo
nodeStatus := nl.GetNodeExecutionStatus(ctx, currentNode.GetID())
nodePhase := nodeStatus.GetPhase()

if nodePhase == v1alpha1.NodePhaseRunning {
if nodePhase == v1alpha1.NodePhaseRunning && execContext != nil {
execContext.IncrementNodeExecutionCount()
if currentNode.GetKind() == v1alpha1.NodeKindTask {
execContext.IncrementTaskExecutionCount()
Expand Down Expand Up @@ -296,7 +296,7 @@ func (c *recursiveNodeExecutor) handleDownstream(ctx context.Context, execContex
}), nil
}

logger.Infof(ctx, "F3 starting node id %v, ", downstreamNode.GetID())
logger.Debugf(ctx, "downstream handler starting node id %v, ", downstreamNode.GetID())
state, err := c.RecursiveNodeHandler(ctx, execContext, dag, nl, downstreamNode)
if err != nil {
return interfaces.NodeStatusUndefined, err
Expand Down
39 changes: 30 additions & 9 deletions flytepropeller/pkg/controller/workflow/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/subworkflow/launchplan"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/task/fakeplugins"
wfErrors "github.com/flyteorg/flyte/flytepropeller/pkg/controller/workflow/errors"
execStats "github.com/flyteorg/flyte/flytepropeller/pkg/controller/workflowstore"
"github.com/flyteorg/flyte/flytestdlib/contextutils"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/promutils"
Expand Down Expand Up @@ -248,7 +249,10 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_Error(t *testing.T) {
nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, adminClient, adminClient,
maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, handlerFactory, promutils.NewTestScope())
assert.NoError(t, err)
executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "", nodeExec, eventConfig, testClusterID, promutils.NewTestScope())

execStatsHolder, err := execStats.NewExecutionStatsHolder()
assert.NoError(t, err)
executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "", nodeExec, eventConfig, testClusterID, promutils.NewTestScope(), execStatsHolder)
assert.NoError(t, err)

assert.NoError(t, executor.Initialize(ctx))
Expand Down Expand Up @@ -332,7 +336,9 @@ func TestWorkflowExecutor_HandleFlyteWorkflow(t *testing.T) {
maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, handlerFactory, promutils.NewTestScope())
assert.NoError(t, err)

executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "", nodeExec, eventConfig, testClusterID, promutils.NewTestScope())
execStatsHolder, err := execStats.NewExecutionStatsHolder()
assert.NoError(t, err)
executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "", nodeExec, eventConfig, testClusterID, promutils.NewTestScope(), execStatsHolder)
assert.NoError(t, err)

assert.NoError(t, executor.Initialize(ctx))
Expand Down Expand Up @@ -396,7 +402,9 @@ func BenchmarkWorkflowExecutor(b *testing.B) {
maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, handlerFactory, scope)
assert.NoError(b, err)

executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "", nodeExec, eventConfig, testClusterID, promutils.NewTestScope())
execStatsHolder, err := execStats.NewExecutionStatsHolder()
assert.NoError(b, err)
executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "", nodeExec, eventConfig, testClusterID, promutils.NewTestScope(), execStatsHolder)
assert.NoError(b, err)

assert.NoError(b, executor.Initialize(ctx))
Expand Down Expand Up @@ -507,7 +515,10 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_Failing(t *testing.T) {
nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, adminClient, adminClient,
maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, handlerFactory, promutils.NewTestScope())
assert.NoError(t, err)
executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "", nodeExec, eventConfig, testClusterID, promutils.NewTestScope())

execStatsHolder, err := execStats.NewExecutionStatsHolder()
assert.NoError(t, err)
executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "", nodeExec, eventConfig, testClusterID, promutils.NewTestScope(), execStatsHolder)
assert.NoError(t, err)

assert.NoError(t, executor.Initialize(ctx))
Expand Down Expand Up @@ -609,7 +620,9 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_Events(t *testing.T) {
nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, adminClient, adminClient,
maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, handlerFactory, promutils.NewTestScope())
assert.NoError(t, err)
executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "metadata", nodeExec, eventConfig, testClusterID, promutils.NewTestScope())
execStatsHolder, err := execStats.NewExecutionStatsHolder()
assert.NoError(t, err)
executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "metadata", nodeExec, eventConfig, testClusterID, promutils.NewTestScope(), execStatsHolder)
assert.NoError(t, err)

assert.NoError(t, executor.Initialize(ctx))
Expand Down Expand Up @@ -684,7 +697,9 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_EventFailure(t *testing.T) {
Cause: errors.New("already exists"),
}
}
executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "metadata", nodeExec, eventConfig, testClusterID, promutils.NewTestScope())
execStatsHolder, err := execStats.NewExecutionStatsHolder()
assert.NoError(t, err)
executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "metadata", nodeExec, eventConfig, testClusterID, promutils.NewTestScope(), execStatsHolder)
assert.NoError(t, err)
w := &v1alpha1.FlyteWorkflow{}
assert.NoError(t, json.Unmarshal(wJSON, w))
Expand All @@ -703,7 +718,9 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_EventFailure(t *testing.T) {
Cause: errors.New("already exists"),
}
}
executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "metadata", nodeExec, eventConfig, testClusterID, promutils.NewTestScope())
execStatsHolder, err := execStats.NewExecutionStatsHolder()
assert.NoError(t, err)
executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "metadata", nodeExec, eventConfig, testClusterID, promutils.NewTestScope(), execStatsHolder)
assert.NoError(t, err)
w := &v1alpha1.FlyteWorkflow{}
assert.NoError(t, json.Unmarshal(wJSON, w))
Expand All @@ -719,7 +736,9 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_EventFailure(t *testing.T) {
Cause: errors.New("generic exists"),
}
}
executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "metadata", nodeExec, eventConfig, testClusterID, promutils.NewTestScope())
execStatsHolder, err := execStats.NewExecutionStatsHolder()
assert.NoError(t, err)
executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "metadata", nodeExec, eventConfig, testClusterID, promutils.NewTestScope(), execStatsHolder)
assert.NoError(t, err)
w := &v1alpha1.FlyteWorkflow{}
assert.NoError(t, json.Unmarshal(wJSON, w))
Expand All @@ -736,7 +755,9 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_EventFailure(t *testing.T) {
Cause: errors.New("incompatible cluster"),
}
}
executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "metadata", nodeExec, eventConfig, testClusterID, promutils.NewTestScope())
execStatsHolder, err := execStats.NewExecutionStatsHolder()
assert.NoError(t, err)
executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "metadata", nodeExec, eventConfig, testClusterID, promutils.NewTestScope(), execStatsHolder)
assert.NoError(t, err)
w := &v1alpha1.FlyteWorkflow{}
assert.NoError(t, json.Unmarshal(wJSON, w))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,34 @@ func TestAggregateActiveValues(t *testing.T) {
assert.Equal(t, uint32(16), tasks)
}

// Test removal on an empty ExecutionStatsHolder
func TestRemoveTerminatedExecutionsEmpty(t *testing.T) {
esh, err := NewExecutionStatsHolder()
assert.NoError(t, err)

err = esh.RemoveTerminatedExecutions(context.TODO(), map[string]bool{})
assert.NoError(t, err)

err = esh.RemoveTerminatedExecutions(context.TODO(), map[string]bool{"exec1": true})
assert.NoError(t, err)
}

// Test removal of a subset of entries from ExcutionStatsHolder
func TestRemoveTerminatedExecutionsSubset(t *testing.T) {
esh, err := NewExecutionStatsHolder()
assert.NoError(t, err)

esh.AddOrUpdateEntry("exec1", SingleExecutionStats{ActiveNodeCount: 5, ActiveTaskCount: 10})
esh.AddOrUpdateEntry("exec2", SingleExecutionStats{ActiveNodeCount: 3, ActiveTaskCount: 6})

err = esh.RemoveTerminatedExecutions(context.TODO(), map[string]bool{"exec2": true})
assert.NoError(t, err)

assert.Equal(t, 1, len(esh.executions))
assert.Equal(t, uint32(3), esh.executions["exec2"].ActiveNodeCount)
assert.Equal(t, uint32(6), esh.executions["exec2"].ActiveTaskCount)
}

func TestConcurrentAccess(t *testing.T) {
esh, err := NewExecutionStatsHolder()
assert.NoError(t, err)
Expand Down

0 comments on commit 1254afa

Please sign in to comment.