Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support failure node #4308

Merged
merged 24 commits into from
Dec 9, 2023
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion charts/flyte-core/values-eks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ configmap:
propeller:
resourcemanager:
type: noop
# Note: By default resource manager is disable for propeller, Please use `type: redis` to enaable
# Note: By default resource manager is disabled for propeller, Please use `type: redis` to enable
# type: redis
# resourceMaxQuota: 10000
# redis:
Expand Down
3 changes: 3 additions & 0 deletions flytepropeller/pkg/compiler/requirements.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ func updateWorkflowRequirements(workflow *core.WorkflowTemplate, subWfs common.W
for _, node := range workflow.Nodes {
updateNodeRequirements(node, subWfs, taskIds, workflowIds, followSubworkflows, errs)
}
if workflow.FailureNode != nil {
updateNodeRequirements(workflow.FailureNode, subWfs, taskIds, workflowIds, followSubworkflows, errs)
}
}

func updateNodeRequirements(node *flyteNode, subWfs common.WorkflowIndex, taskIds, workflowIds common.IdentifierSet,
Expand Down
6 changes: 6 additions & 0 deletions flytepropeller/pkg/compiler/workflow_compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,12 @@ func (w workflowBuilder) ValidateWorkflow(fg *flyteWorkflow, errs errors.Compile
wf.AddEdges(n, c.EdgeDirectionBidirectional, errs.NewScope())
}

if fg.Template.FailureNode != nil {
failureNode := fg.Template.FailureNode
v.ValidateNode(&wf, wf.GetOrCreateNodeBuilder(failureNode), false /* validateConditionTypes */, errs.NewScope())
hamersaw marked this conversation as resolved.
Show resolved Hide resolved
wf.AddEdges(wf.GetOrCreateNodeBuilder(failureNode), c.EdgeDirectionUpstream, errs.NewScope())
}

// Add execution edges for orphan nodes that don't have any inward/outward edges.
for nodeID := range wf.Nodes {
if nodeID == c.StartNodeID || nodeID == c.EndNodeID {
Expand Down
93 changes: 93 additions & 0 deletions flytepropeller/pkg/compiler/workflow_compiler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,99 @@ func ExampleCompileWorkflow_basic() {
// Compile Errors: <nil>
}

func TestCompileWorkflowWithFailureNode(t *testing.T) {
inputWorkflow := &core.WorkflowTemplate{
Id: &core.Identifier{Name: "repo"},
Interface: &core.TypedInterface{
Inputs: createEmptyVariableMap(),
Outputs: createEmptyVariableMap(),
},
Nodes: []*core.Node{
{
Id: "FirstNode",
Target: &core.Node_TaskNode{
TaskNode: &core.TaskNode{
Reference: &core.TaskNode_ReferenceId{
ReferenceId: &core.Identifier{Name: "task_123"},
},
},
},
},
},
FailureNode: &core.Node{
Id: "FailureNode",
Target: &core.Node_TaskNode{
TaskNode: &core.TaskNode{
Reference: &core.TaskNode_ReferenceId{
ReferenceId: &core.Identifier{Name: "cleanup"},
},
},
},
},
}

// Detect what other workflows/tasks does this coreWorkflow reference
subWorkflows := make([]*core.WorkflowTemplate, 0)
reqs, err := GetRequirements(inputWorkflow, subWorkflows)
if err != nil {
fmt.Printf("failed to get requirements. Error: %v", err)
hamersaw marked this conversation as resolved.
Show resolved Hide resolved
return
}

fmt.Printf("Needed Tasks: [%v], Needed Workflows [%v]\n",
strings.Join(dumpIdentifierNames(reqs.GetRequiredTaskIds()), ","),
strings.Join(dumpIdentifierNames(reqs.GetRequiredLaunchPlanIds()), ","))
hamersaw marked this conversation as resolved.
Show resolved Hide resolved

// Replace with logic to satisfy the requirements
workflows := make([]common.InterfaceProvider, 0)
tasks := []*core.TaskTemplate{
{
Id: &core.Identifier{Name: "task_123"},
Interface: &core.TypedInterface{
Inputs: createEmptyVariableMap(),
Outputs: createEmptyVariableMap(),
},
Target: &core.TaskTemplate_Container{
Container: &core.Container{
Image: "image://",
Command: []string{"cmd"},
Args: []string{"args"},
},
},
},
{
Id: &core.Identifier{Name: "cleanup"},
Interface: &core.TypedInterface{
Inputs: createEmptyVariableMap(),
Outputs: createEmptyVariableMap(),
},
Target: &core.TaskTemplate_Container{
Container: &core.Container{
Image: "image://",
Command: []string{"cmd"},
Args: []string{"args"},
},
},
},
}

compiledTasks := make([]*core.CompiledTask, 0, len(tasks))
for _, task := range tasks {
compiledTask, err := CompileTask(task)
if err != nil {
fmt.Printf("failed to compile task [%v]. Error: %v", task.Id, err)
hamersaw marked this conversation as resolved.
Show resolved Hide resolved
return
}

compiledTasks = append(compiledTasks, compiledTask)
}

output, errs := CompileWorkflow(inputWorkflow, subWorkflows, compiledTasks, workflows)
assert.Equal(t, output.Primary.Template.FailureNode.Id, "FailureNode")
assert.NotNil(t, output.Primary.Template.FailureNode.GetTaskNode())
assert.Nil(t, errs)
}

func ExampleCompileWorkflow_inputsOutputsBinding() {
inputWorkflow := &core.WorkflowTemplate{
Id: &core.Identifier{Name: "repo"},
Expand Down
44 changes: 44 additions & 0 deletions flytepropeller/pkg/controller/executors/failure_node_lookup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package executors

import (
"context"

"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
)

type FailureNodeLookup struct {
NodeLookup
FailureNode v1alpha1.ExecutableNode
FailureNodeStatus v1alpha1.ExecutableNodeStatus
}

func (f FailureNodeLookup) GetNode(nodeID v1alpha1.NodeID) (v1alpha1.ExecutableNode, bool) {
if nodeID == v1alpha1.StartNodeID {
return f.NodeLookup.GetNode(nodeID)
}
return f.FailureNode, true
}

func (f FailureNodeLookup) GetNodeExecutionStatus(ctx context.Context, id v1alpha1.NodeID) v1alpha1.ExecutableNodeStatus {
if id == v1alpha1.StartNodeID {
return f.NodeLookup.GetNodeExecutionStatus(ctx, id)
}
return f.FailureNodeStatus
}

func (f FailureNodeLookup) ToNode(id v1alpha1.NodeID) ([]v1alpha1.NodeID, error) {
// The upstream node of the failure node is always the start node
return []v1alpha1.NodeID{v1alpha1.StartNodeID}, nil
}

func (f FailureNodeLookup) FromNode(id v1alpha1.NodeID) ([]v1alpha1.NodeID, error) {
return nil, nil
}

func NewFailureNodeLookup(nodeLookup NodeLookup, failureNode v1alpha1.ExecutableNode, failureNodeStatus v1alpha1.ExecutableNodeStatus) NodeLookup {
return FailureNodeLookup{
NodeLookup: nodeLookup,
FailureNode: failureNode,
FailureNodeStatus: failureNodeStatus,
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package executors

import (
"context"
"testing"

"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks"
"github.com/stretchr/testify/assert"
)

type nl struct {
NodeLookup
}

type en struct {
v1alpha1.ExecutableNode
}

type ns struct {
v1alpha1.ExecutableNodeStatus
}

func TestNewFailureNodeLookup(t *testing.T) {
nl := nl{}
en := en{}
ns := ns{}
nodeLoopUp := NewFailureNodeLookup(nl, en, ns)
assert.NotNil(t, nl)
typed := nodeLoopUp.(FailureNodeLookup)
assert.Equal(t, nl, typed.NodeLookup)
assert.Equal(t, en, typed.FailureNode)
assert.Equal(t, ns, typed.FailureNodeStatus)
}

func TestNewTestFailureNodeLookup(t *testing.T) {
n := &mocks.ExecutableNode{}
ns := &mocks.ExecutableNodeStatus{}
failureNodeID := "fn1"
nl := NewTestNodeLookup(
map[string]v1alpha1.ExecutableNode{v1alpha1.StartNodeID: n, failureNodeID: n},
map[string]v1alpha1.ExecutableNodeStatus{v1alpha1.StartNodeID: ns, failureNodeID: ns},
)

assert.NotNil(t, nl)

failureNodeLookup := NewFailureNodeLookup(nl, n, ns)
r, ok := failureNodeLookup.GetNode(v1alpha1.StartNodeID)
assert.True(t, ok)
assert.Equal(t, n, r)
assert.Equal(t, ns, failureNodeLookup.GetNodeExecutionStatus(context.TODO(), v1alpha1.StartNodeID))

r, ok = failureNodeLookup.GetNode(failureNodeID)
assert.True(t, ok)
assert.Equal(t, n, r)
assert.Equal(t, ns, failureNodeLookup.GetNodeExecutionStatus(context.TODO(), failureNodeID))

nodeIDs, err := failureNodeLookup.ToNode(failureNodeID)
assert.Equal(t, len(nodeIDs), 1)
assert.Equal(t, nodeIDs[0], v1alpha1.StartNodeID)
assert.Nil(t, err)

nodeIDs, err = failureNodeLookup.FromNode(failureNodeID)
assert.Nil(t, nodeIDs)
assert.Nil(t, err)
}
2 changes: 1 addition & 1 deletion flytepropeller/pkg/controller/nodes/predicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func CanExecute(ctx context.Context, dag executors.DAGStructure, nl executors.No

upstreamNodes, err := dag.ToNode(nodeID)
if err != nil {
return PredicatePhaseUndefined, errors.Errorf(errors.BadSpecificationError, nodeID, "Unable to find upstream nodes for Node")
return PredicatePhaseUndefined, errors.Errorf(errors.BadSpecificationError, nodeID, "Unable to find upstream nodes for Node {%v}", nodeID)
}

skipped := false
Expand Down
10 changes: 5 additions & 5 deletions flytepropeller/pkg/controller/nodes/subworkflow/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,11 @@
errors.BadSpecificationError, errMsg, nil)), nil
}

updateNodeStateFn := func(transition handler.Transition, newPhase v1alpha1.WorkflowNodePhase, err error) (handler.Transition, error) {
updateNodeStateFn := func(transition handler.Transition, workflowNodeState handler.WorkflowNodeState, err error) (handler.Transition, error) {
if err != nil {
return transition, err
}

workflowNodeState := handler.WorkflowNodeState{Phase: newPhase}
err = nCtx.NodeStateWriter().PutWorkflowNodeState(workflowNodeState)
if err != nil {
logger.Errorf(ctx, "Failed to store WorkflowNodeState, err :%s", err.Error())
Expand All @@ -75,10 +74,10 @@

if wfNode.GetSubWorkflowRef() != nil {
trns, err := w.subWfHandler.StartSubWorkflow(ctx, nCtx)
return updateNodeStateFn(trns, v1alpha1.WorkflowNodePhaseExecuting, err)
return updateNodeStateFn(trns, handler.WorkflowNodeState{Phase: v1alpha1.WorkflowNodePhaseExecuting}, err)

Check warning on line 77 in flytepropeller/pkg/controller/nodes/subworkflow/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/subworkflow/handler.go#L77

Added line #L77 was not covered by tests
} else if wfNode.GetLaunchPlanRefID() != nil {
trns, err := w.lpHandler.StartLaunchPlan(ctx, nCtx)
return updateNodeStateFn(trns, v1alpha1.WorkflowNodePhaseExecuting, err)
return updateNodeStateFn(trns, handler.WorkflowNodeState{Phase: v1alpha1.WorkflowNodePhaseExecuting}, err)
}

return invalidWFNodeError()
Expand All @@ -95,8 +94,9 @@
}

if wfNode.GetSubWorkflowRef() != nil {
originalError := nCtx.NodeStateReader().GetWorkflowNodeState().Error

Check warning on line 97 in flytepropeller/pkg/controller/nodes/subworkflow/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/subworkflow/handler.go#L97

Added line #L97 was not covered by tests
trns, err := w.subWfHandler.HandleFailingSubWorkflow(ctx, nCtx)
return updateNodeStateFn(trns, workflowPhase, err)
return updateNodeStateFn(trns, handler.WorkflowNodeState{Phase: workflowPhase, Error: originalError}, err)

Check warning on line 99 in flytepropeller/pkg/controller/nodes/subworkflow/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/subworkflow/handler.go#L99

Added line #L99 was not covered by tests
} else if wfNode.GetLaunchPlanRefID() != nil {
// There is no failure node for launch plans, terminate immediately.
return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailureErr(wfNodeState.Error, nil)), nil
Expand Down
17 changes: 11 additions & 6 deletions flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,10 @@
func (s *subworkflowHandler) handleSubWorkflow(ctx context.Context, nCtx interfaces.NodeExecutionContext, subworkflow v1alpha1.ExecutableSubWorkflow, nl executors.NodeLookup) (handler.Transition, error) {
// The current node would end up becoming the parent for the sub workflow nodes.
// This is done to track the lineage. For level zero, the CreateParentInfo will return nil
newParentInfo, err := common.CreateParentInfo(nCtx.ExecutionContext().GetParentInfo(), nCtx.NodeID(), nCtx.CurrentAttempt())
execContext, err := s.getExecutionContextForDownstream(nCtx)

Check warning on line 67 in flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go#L67

Added line #L67 was not covered by tests
if err != nil {
return handler.UnknownTransition, err
}
execContext := executors.NewExecutionContextWithParentInfo(nCtx.ExecutionContext(), newParentInfo)
state, err := s.nodeExecutor.RecursiveNodeHandler(ctx, execContext, subworkflow, nl, subworkflow.StartNode())
if err != nil {
return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoUndefined), err
Expand Down Expand Up @@ -143,17 +142,23 @@

func (s *subworkflowHandler) HandleFailureNodeOfSubWorkflow(ctx context.Context, nCtx interfaces.NodeExecutionContext, subworkflow v1alpha1.ExecutableSubWorkflow, nl executors.NodeLookup) (handler.Transition, error) {
originalError := nCtx.NodeStateReader().GetWorkflowNodeState().Error
if subworkflow.GetOnFailureNode() != nil {
failureNode := subworkflow.GetOnFailureNode()
if failureNode != nil {

Check warning on line 146 in flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go#L145-L146

Added lines #L145 - L146 were not covered by tests
hamersaw marked this conversation as resolved.
Show resolved Hide resolved
execContext, err := s.getExecutionContextForDownstream(nCtx)
if err != nil {
return handler.UnknownTransition, err
}
state, err := s.nodeExecutor.RecursiveNodeHandler(ctx, execContext, subworkflow, nl, subworkflow.GetOnFailureNode())
status := nCtx.NodeStatus()
subworkflowNodeLookup := executors.NewNodeLookup(subworkflow, status, subworkflow)
failureNodeStatus := status.GetNodeExecutionStatus(ctx, failureNode.GetID())
failureNodeLookup := executors.NewFailureNodeLookup(subworkflowNodeLookup, failureNode, failureNodeStatus)

state, err := s.nodeExecutor.RecursiveNodeHandler(ctx, execContext, failureNodeLookup, failureNodeLookup, failureNode)

Check warning on line 156 in flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go#L151-L156

Added lines #L151 - L156 were not covered by tests
if err != nil {
return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoUndefined), err
}

if state.NodePhase == interfaces.NodePhaseRunning {
if state.NodePhase == interfaces.NodePhaseQueued || state.NodePhase == interfaces.NodePhaseRunning {

Check warning on line 161 in flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go#L161

Added line #L161 was not covered by tests
return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(nil)), nil
}

Expand All @@ -168,7 +173,7 @@
return handler.UnknownTransition, err
}

return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailingErr(originalError, nil)), nil
return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(nil)), nil

Check warning on line 176 in flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go#L176

Added line #L176 was not covered by tests
}

// When handling the failure node succeeds, the final status will still be failure
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,36 @@ func TestGetSubWorkflow(t *testing.T) {
assert.Equal(t, swf, w)
})

t.Run("subworkflow with failure node", func(t *testing.T) {

wfNode := &coreMocks.ExecutableWorkflowNode{}
x := "x"
wfNode.OnGetSubWorkflowRef().Return(&x)

node := &coreMocks.ExecutableNode{}
node.OnGetWorkflowNode().Return(wfNode)

ectx := &execMocks.ExecutionContext{}

wfFailureNode := &coreMocks.ExecutableWorkflowNode{}
y := "y"
wfFailureNode.OnGetSubWorkflowRef().Return(&y)
failureNode := &coreMocks.ExecutableNode{}
failureNode.OnGetWorkflowNode().Return(wfFailureNode)

swf := &coreMocks.ExecutableSubWorkflow{}
swf.OnGetOnFailureNode().Return(failureNode)
ectx.OnFindSubWorkflow("x").Return(swf)

nCtx := &mocks.NodeExecutionContext{}
nCtx.OnNode().Return(node)
nCtx.OnExecutionContext().Return(ectx)

w, err := GetSubWorkflow(ctx, nCtx)
assert.NoError(t, err)
assert.Equal(t, swf, w)
})

t.Run("missing-subworkflow", func(t *testing.T) {

wfNode := &coreMocks.ExecutableWorkflowNode{}
Expand Down
Loading
Loading