Skip to content

Commit

Permalink
Support ArrayNode subNode timeouts (#6054)
Browse files Browse the repository at this point in the history
* storing delta timestamps to set subnode lastattemptstartedat

Signed-off-by: Daniel Rammer <[email protected]>

* remove unnecessary print

Signed-off-by: Daniel Rammer <[email protected]>

* tests passing - still need to add validation

Signed-off-by: Daniel Rammer <[email protected]>

* added unit test

Signed-off-by: Daniel Rammer <[email protected]>

* fixed linter

Signed-off-by: Daniel Rammer <[email protected]>

* make generate

Signed-off-by: Daniel Rammer <[email protected]>

* only setting timestamps if SubNodeDeltaTimestamps has been initialized

Signed-off-by: Daniel Rammer <[email protected]>

* correctly checking ItemsCount

Signed-off-by: Daniel Rammer <[email protected]>

* validating SubNodeDeltaTimestamps based on underlying BitSet

Signed-off-by: Daniel Rammer <[email protected]>

* tmp update to strip timeouts and retries for ArrayNode

Signed-off-by: Daniel Rammer <[email protected]>

* removed tmp ArrayNode NodeSpec field overrides

Signed-off-by: Daniel Rammer <[email protected]>

---------

Signed-off-by: Daniel Rammer <[email protected]>
hamersaw authored Dec 5, 2024
1 parent 1332408 commit 76c7f76
Showing 9 changed files with 208 additions and 35 deletions.
2 changes: 2 additions & 0 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go
Original file line number Diff line number Diff line change
@@ -290,6 +290,7 @@ type ExecutableArrayNodeStatus interface {
GetSubNodeTaskPhases() bitarray.CompactArray
GetSubNodeRetryAttempts() bitarray.CompactArray
GetSubNodeSystemFailures() bitarray.CompactArray
GetSubNodeDeltaTimestamps() bitarray.CompactArray
GetTaskPhaseVersion() uint32
}

@@ -302,6 +303,7 @@ type MutableArrayNodeStatus interface {
SetSubNodeTaskPhases(subNodeTaskPhases bitarray.CompactArray)
SetSubNodeRetryAttempts(subNodeRetryAttempts bitarray.CompactArray)
SetSubNodeSystemFailures(subNodeSystemFailures bitarray.CompactArray)
SetSubNodeDeltaTimestamps(subNodeDeltaTimestamps bitarray.CompactArray)
SetTaskPhaseVersion(taskPhaseVersion uint32)
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 19 additions & 7 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go
Original file line number Diff line number Diff line change
@@ -230,13 +230,14 @@ const (

type ArrayNodeStatus struct {
MutableStruct
Phase ArrayNodePhase `json:"phase,omitempty"`
ExecutionError *core.ExecutionError `json:"executionError,omitempty"`
SubNodePhases bitarray.CompactArray `json:"subphase,omitempty"`
SubNodeTaskPhases bitarray.CompactArray `json:"subtphase,omitempty"`
SubNodeRetryAttempts bitarray.CompactArray `json:"subattempts,omitempty"`
SubNodeSystemFailures bitarray.CompactArray `json:"subsysfailures,omitempty"`
TaskPhaseVersion uint32 `json:"taskPhaseVersion,omitempty"`
Phase ArrayNodePhase `json:"phase,omitempty"`
ExecutionError *core.ExecutionError `json:"executionError,omitempty"`
SubNodePhases bitarray.CompactArray `json:"subphase,omitempty"`
SubNodeTaskPhases bitarray.CompactArray `json:"subtphase,omitempty"`
SubNodeRetryAttempts bitarray.CompactArray `json:"subattempts,omitempty"`
SubNodeSystemFailures bitarray.CompactArray `json:"subsysfailures,omitempty"`
SubNodeDeltaTimestamps bitarray.CompactArray `json:"subtimestamps,omitempty"`
TaskPhaseVersion uint32 `json:"taskPhaseVersion,omitempty"`
}

func (in *ArrayNodeStatus) GetArrayNodePhase() ArrayNodePhase {
@@ -305,6 +306,17 @@ func (in *ArrayNodeStatus) SetSubNodeSystemFailures(subNodeSystemFailures bitarr
}
}

func (in *ArrayNodeStatus) GetSubNodeDeltaTimestamps() bitarray.CompactArray {
return in.SubNodeDeltaTimestamps
}

func (in *ArrayNodeStatus) SetSubNodeDeltaTimestamps(subNodeDeltaTimestamps bitarray.CompactArray) {
if in.SubNodeDeltaTimestamps != subNodeDeltaTimestamps {
in.SetDirty()
in.SubNodeDeltaTimestamps = subNodeDeltaTimestamps
}
}

func (in *ArrayNodeStatus) GetTaskPhaseVersion() uint32 {
return in.TaskPhaseVersion
}
32 changes: 32 additions & 0 deletions flytepropeller/pkg/controller/nodes/array/handler.go
Original file line number Diff line number Diff line change
@@ -5,6 +5,9 @@ import (
"fmt"
"math"
"strconv"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

idlcore "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core"
@@ -28,6 +31,11 @@ import (
"github.com/flyteorg/flyte/flytestdlib/storage"
)

const (
// value is 3 days of seconds which is covered by 18 bits (262144)
MAX_DELTA_TIMESTAMP = 259200
)

var (
nilLiteral = &idlcore.Literal{
Value: &idlcore.Literal_Scalar{
@@ -254,6 +262,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu
{arrayReference: &arrayNodeState.SubNodeTaskPhases, maxValue: len(core.Phases) - 1},
{arrayReference: &arrayNodeState.SubNodeRetryAttempts, maxValue: maxAttemptsValue},
{arrayReference: &arrayNodeState.SubNodeSystemFailures, maxValue: maxSystemFailuresValue},
{arrayReference: &arrayNodeState.SubNodeDeltaTimestamps, maxValue: MAX_DELTA_TIMESTAMP},
} {

*item.arrayReference, err = bitarray.NewCompactArray(uint(size), bitarray.Item(item.maxValue)) // #nosec G115
@@ -380,6 +389,20 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu
arrayNodeState.SubNodeRetryAttempts.SetItem(index, uint64(subNodeStatus.GetAttempts()))
arrayNodeState.SubNodeSystemFailures.SetItem(index, uint64(subNodeStatus.GetSystemFailures()))

if arrayNodeState.SubNodeDeltaTimestamps.BitSet != nil {
startedAt := nCtx.NodeStatus().GetLastAttemptStartedAt()
subNodeStartedAt := subNodeStatus.GetLastAttemptStartedAt()
if subNodeStartedAt == nil {
// subNodeStartedAt == nil indicates either (1) node has not started or (2) node status has
// been reset (ex. retryable failure). in both cases we set the delta timestamp to 0
arrayNodeState.SubNodeDeltaTimestamps.SetItem(index, 0)
} else if startedAt != nil && arrayNodeState.SubNodeDeltaTimestamps.GetItem(index) == 0 {
// otherwise if `SubNodeDeltaTimestamps` is unset, we compute the delta and set it
deltaDuration := uint64(subNodeStartedAt.Time.Sub(startedAt.Time).Seconds())
arrayNodeState.SubNodeDeltaTimestamps.SetItem(index, deltaDuration)
}
}

// increment task phase version if subNode phase or task phase changed
if subNodeStatus.GetPhase() != nodeExecutionRequest.nodePhase || subNodeStatus.GetTaskNodeStatus().GetPhase() != nodeExecutionRequest.taskPhase {
incrementTaskPhaseVersion = true
@@ -767,6 +790,14 @@ func (a *arrayNodeHandler) buildArrayNodeContext(ctx context.Context, nCtx inter
return nil, nil, nil, nil, nil, nil, err
}

// compute start time for subNode using delta timestamp from ArrayNode NodeStatus
var startedAt *metav1.Time
if nCtx.NodeStatus().GetLastAttemptStartedAt() != nil && arrayNodeState.SubNodeDeltaTimestamps.BitSet != nil {
if deltaSeconds := arrayNodeState.SubNodeDeltaTimestamps.GetItem(subNodeIndex); deltaSeconds != 0 {
startedAt = &metav1.Time{Time: nCtx.NodeStatus().GetLastAttemptStartedAt().Add(time.Duration(deltaSeconds) * time.Second)} // #nosec G115
}
}

subNodeStatus := &v1alpha1.NodeStatus{
Phase: nodePhase,
DataDir: subDataDir,
@@ -777,6 +808,7 @@ func (a *arrayNodeHandler) buildArrayNodeContext(ctx context.Context, nCtx inter
Phase: taskPhase,
PluginState: pluginStateBytes,
},
LastAttemptStartedAt: startedAt,
}

// initialize mocks
93 changes: 72 additions & 21 deletions flytepropeller/pkg/controller/nodes/array/handler_test.go
Original file line number Diff line number Diff line change
@@ -4,9 +4,11 @@ import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

idlcore "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
@@ -184,9 +186,15 @@ func createNodeExecutionContext(dataStore *storage.DataStore, eventRecorder inte
nCtx.OnNodeStateWriter().Return(nodeStateWriter)

// NodeStatus
nowMinus := time.Now().Add(time.Duration(-5) * time.Second)
metav1NowMinus := metav1.Time{
Time: nowMinus,
}
nCtx.OnNodeStatus().Return(&v1alpha1.NodeStatus{
DataDir: storage.DataReference("s3://bucket/data"),
OutputDir: storage.DataReference("s3://bucket/output"),
DataDir: storage.DataReference("s3://bucket/data"),
OutputDir: storage.DataReference("s3://bucket/output"),
LastAttemptStartedAt: &metav1NowMinus,
StartedAt: &metav1NowMinus,
})

return nCtx
@@ -252,6 +260,7 @@ func TestAbort(t *testing.T) {
{arrayReference: &arrayNodeState.SubNodeTaskPhases, maxValue: len(core.Phases) - 1},
{arrayReference: &arrayNodeState.SubNodeRetryAttempts, maxValue: 1},
{arrayReference: &arrayNodeState.SubNodeSystemFailures, maxValue: 1},
{arrayReference: &arrayNodeState.SubNodeDeltaTimestamps, maxValue: 1024},
} {

*item.arrayReference, err = bitarray.NewCompactArray(uint(size), bitarray.Item(item.maxValue)) // #nosec G115
@@ -348,6 +357,7 @@ func TestFinalize(t *testing.T) {
{arrayReference: &arrayNodeState.SubNodeTaskPhases, maxValue: len(core.Phases) - 1},
{arrayReference: &arrayNodeState.SubNodeRetryAttempts, maxValue: 1},
{arrayReference: &arrayNodeState.SubNodeSystemFailures, maxValue: 1},
{arrayReference: &arrayNodeState.SubNodeDeltaTimestamps, maxValue: 1024},
} {
*item.arrayReference, err = bitarray.NewCompactArray(uint(size), bitarray.Item(item.maxValue)) // #nosec G115
assert.NoError(t, err)
@@ -506,25 +516,27 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
}

tests := []struct {
name string
parallelism *uint32
minSuccessRatio *float32
subNodePhases []v1alpha1.NodePhase
subNodeTaskPhases []core.Phase
subNodeTransitions []handler.Transition
expectedArrayNodePhase v1alpha1.ArrayNodePhase
expectedArrayNodeSubPhases []v1alpha1.NodePhase
expectedTransitionPhase handler.EPhase
expectedExternalResourcePhases []idlcore.TaskExecution_Phase
currentWfParallelism uint32
maxWfParallelism uint32
incrementParallelismCount uint32
useFakeEventRecorder bool
eventRecorderFailures uint32
eventRecorderError error
expectedTaskPhaseVersion uint32
expectHandleError bool
expectedEventingCalls int
name string
parallelism *uint32
minSuccessRatio *float32
subNodePhases []v1alpha1.NodePhase
subNodeTaskPhases []core.Phase
subNodeDeltaTimestamps []uint64
subNodeTransitions []handler.Transition
expectedArrayNodePhase v1alpha1.ArrayNodePhase
expectedArrayNodeSubPhases []v1alpha1.NodePhase
expectedDiffArrayNodeSubDeltaTimestamps []bool
expectedTransitionPhase handler.EPhase
expectedExternalResourcePhases []idlcore.TaskExecution_Phase
currentWfParallelism uint32
maxWfParallelism uint32
incrementParallelismCount uint32
useFakeEventRecorder bool
eventRecorderFailures uint32
eventRecorderError error
expectedTaskPhaseVersion uint32
expectHandleError bool
expectedEventingCalls int
}{
{
name: "StartAllSubNodes",
@@ -827,6 +839,31 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
expectHandleError: true,
expectedEventingCalls: 1,
},
{
name: "DeltaTimestampUpdates",
parallelism: uint32Ptr(0),
subNodePhases: []v1alpha1.NodePhase{
v1alpha1.NodePhaseQueued,
v1alpha1.NodePhaseRunning,
},
subNodeTaskPhases: []core.Phase{
core.PhaseUndefined,
core.PhaseUndefined,
},
subNodeTransitions: []handler.Transition{
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})),
handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRetryableFailure(idlcore.ExecutionError_SYSTEM, "", "", &handler.ExecutionInfo{})),
},
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting,
expectedArrayNodeSubPhases: []v1alpha1.NodePhase{
v1alpha1.NodePhaseRunning,
v1alpha1.NodePhaseRetryableFailure,
},
expectedTaskPhaseVersion: 1,
expectedTransitionPhase: handler.EPhaseRunning,
expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING, idlcore.TaskExecution_FAILED},
incrementParallelismCount: 1,
},
}

for _, test := range tests {
@@ -858,6 +895,7 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
{arrayReference: &arrayNodeState.SubNodeTaskPhases, maxValue: len(core.Phases) - 1},
{arrayReference: &arrayNodeState.SubNodeRetryAttempts, maxValue: 1},
{arrayReference: &arrayNodeState.SubNodeSystemFailures, maxValue: 1},
{arrayReference: &arrayNodeState.SubNodeDeltaTimestamps, maxValue: 1024},
} {
*item.arrayReference, err = bitarray.NewCompactArray(uint(size), bitarray.Item(item.maxValue)) // #nosec G115
assert.NoError(t, err)
@@ -867,6 +905,10 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
arrayNodeState.SubNodePhases.SetItem(i, bitarray.Item(nodePhase)) // #nosec G115
}

for i, deltaTimestmap := range test.subNodeDeltaTimestamps {
arrayNodeState.SubNodeDeltaTimestamps.SetItem(i, deltaTimestmap) // #nosec G115
}

nodeSpec := arrayNodeSpec
nodeSpec.ArrayNode.Parallelism = test.parallelism
nodeSpec.ArrayNode.MinSuccessRatio = test.minSuccessRatio
@@ -922,6 +964,14 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
assert.Equal(t, expectedPhase, v1alpha1.NodePhase(arrayNodeState.SubNodePhases.GetItem(i))) // #nosec G115
}

for i, expectedDiffDeltaTimestamps := range test.expectedDiffArrayNodeSubDeltaTimestamps {
if expectedDiffDeltaTimestamps {
assert.NotEqual(t, arrayNodeState.SubNodeDeltaTimestamps.GetItem(i), test.subNodeDeltaTimestamps[i])
} else {
assert.Equal(t, arrayNodeState.SubNodeDeltaTimestamps.GetItem(i), test.subNodeDeltaTimestamps[i])
}
}

bufferedEventRecorder, ok := eventRecorder.(*bufferedEventRecorder)
if ok {
if len(test.expectedExternalResourcePhases) > 0 {
@@ -1301,6 +1351,7 @@ func TestHandleArrayNodePhaseFailing(t *testing.T) {
{arrayReference: &arrayNodeState.SubNodeTaskPhases, maxValue: len(core.Phases) - 1},
{arrayReference: &arrayNodeState.SubNodeRetryAttempts, maxValue: 1},
{arrayReference: &arrayNodeState.SubNodeSystemFailures, maxValue: 1},
{arrayReference: &arrayNodeState.SubNodeDeltaTimestamps, maxValue: 1024},
} {
*item.arrayReference, err = bitarray.NewCompactArray(uint(len(test.subNodePhases)), bitarray.Item(item.maxValue)) // #nosec G115
assert.NoError(t, err)
Loading

0 comments on commit 76c7f76

Please sign in to comment.