From 5319a292a4cbb5f4cc6c9e0466898af58c300a1f Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 26 Oct 2023 12:46:32 -0700 Subject: [PATCH 01/16] init failure node Signed-off-by: Kevin Su --- charts/flyte-core/values-eks.yaml | 2 +- .../apis/flyteworkflow/v1alpha1/workflow.go | 2 ++ .../pkg/compiler/transformers/k8s/node.go | 4 ++++ .../pkg/controller/executors/node_lookup.go | 2 ++ .../pkg/controller/nodes/executor.go | 3 ++- .../pkg/controller/nodes/node_exec_context.go | 1 + .../pkg/controller/workflow/executor.go | 24 +++++++++++++++---- 7 files changed, 32 insertions(+), 6 deletions(-) diff --git a/charts/flyte-core/values-eks.yaml b/charts/flyte-core/values-eks.yaml index 7b7ca446f5..d5769b5640 100644 --- a/charts/flyte-core/values-eks.yaml +++ b/charts/flyte-core/values-eks.yaml @@ -251,7 +251,7 @@ configmap: propeller: resourcemanager: type: noop - # Note: By default resource manager is disable for propeller, Please use `type: redis` to enaable + # Note: By default resource manager is disabled for propeller, Please use `type: redis` to enable # type: redis # resourceMaxQuota: 10000 # redis: diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/workflow.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/workflow.go index 225a49ac3f..d86ac02340 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/workflow.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/workflow.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/json" + "fmt" "github.com/golang/protobuf/jsonpb" "github.com/pkg/errors" @@ -331,6 +332,7 @@ func (in *WorkflowSpec) GetOutputs() *OutputVarMap { } func (in *WorkflowSpec) GetNode(nodeID NodeID) (ExecutableNode, bool) { + fmt.Print("Getting node ", nodeID, " from ", in.Nodes) n, ok := in.Nodes[nodeID] return n, ok } diff --git a/flytepropeller/pkg/compiler/transformers/k8s/node.go b/flytepropeller/pkg/compiler/transformers/k8s/node.go index 2ac06ebd89..26de17f346 100644 --- a/flytepropeller/pkg/compiler/transformers/k8s/node.go +++ b/flytepropeller/pkg/compiler/transformers/k8s/node.go @@ -1,6 +1,8 @@ package k8s import ( + "context" + "github.com/flyteorg/flyte/flytestdlib/logger" "strings" "github.com/go-test/deep" @@ -33,7 +35,9 @@ func buildNodeSpec(n *core.Node, tasks []*core.CompiledTask, errs errors.Compile if n.GetTaskNode() != nil { taskID := n.GetTaskNode().GetReferenceId().String() // TODO: Use task index for quick lookup + logger.Info(context.Background(), "kevin Looking up task", "taskID", taskID) for _, t := range tasks { + logger.Infof(context.Background(), "kevin Comparing %v with %v", t.Template.Id.String(), taskID) if t.Template.Id.String() == taskID { task = t.Template break diff --git a/flytepropeller/pkg/controller/executors/node_lookup.go b/flytepropeller/pkg/controller/executors/node_lookup.go index 0a714ab4e7..2e82f5920e 100644 --- a/flytepropeller/pkg/controller/executors/node_lookup.go +++ b/flytepropeller/pkg/controller/executors/node_lookup.go @@ -2,6 +2,7 @@ package executors import ( "context" + "fmt" "github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" ) @@ -46,6 +47,7 @@ type staticNodeLookup struct { } func (s staticNodeLookup) GetNode(nodeID v1alpha1.NodeID) (v1alpha1.ExecutableNode, bool) { + fmt.Print("staticNodeLookup.GetNode") n, ok := s.nodes[nodeID] return n, ok } diff --git a/flytepropeller/pkg/controller/nodes/executor.go b/flytepropeller/pkg/controller/nodes/executor.go index b9d727db7e..bd96edcd5f 100644 --- a/flytepropeller/pkg/controller/nodes/executor.go +++ b/flytepropeller/pkg/controller/nodes/executor.go @@ -180,6 +180,7 @@ func (c *recursiveNodeExecutor) RecursiveNodeHandler(ctx context.Context, execCo currentNodeCtx := contextutils.WithNodeID(ctx, currentNode.GetID()) nodeStatus := nl.GetNodeExecutionStatus(ctx, currentNode.GetID()) nodePhase := nodeStatus.GetPhase() + logger.Infof(currentNodeCtx, "Handling node [%v] Status [%v]", currentNode.GetID(), nodePhase.String()) if canHandleNode(nodePhase) { // TODO Follow up Pull Request, @@ -989,7 +990,7 @@ func (c *nodeExecutor) handleNotYetStartedNode(ctx context.Context, dag executor if np != nodeStatus.GetPhase() { // assert np == Queued! - logger.Infof(ctx, "Change in node state detected from [%s] -> [%s]", nodeStatus.GetPhase().String(), np.String()) + logger.Infof(ctx, "Change in node state detected1 from [%s] -> [%s]", nodeStatus.GetPhase().String(), np.String()) p = p.WithOccuredAt(occurredAt) nev, err := ToNodeExecutionEvent(nCtx.NodeExecutionMetadata().GetNodeExecutionID(), diff --git a/flytepropeller/pkg/controller/nodes/node_exec_context.go b/flytepropeller/pkg/controller/nodes/node_exec_context.go index f42f8b0324..d3029f954f 100644 --- a/flytepropeller/pkg/controller/nodes/node_exec_context.go +++ b/flytepropeller/pkg/controller/nodes/node_exec_context.go @@ -261,6 +261,7 @@ func isAboveInterruptibleFailureThreshold(numFailures uint32, maxAttempts uint32 func (c *nodeExecutor) BuildNodeExecutionContext(ctx context.Context, executionContext executors.ExecutionContext, nl executors.NodeLookup, currentNodeID v1alpha1.NodeID) (interfaces.NodeExecutionContext, error) { + fmt.Printf("-------------- BuildNodeExecutionContext for node [%v] in execution [%v]\n", currentNodeID, executionContext.GetID()) n, ok := nl.GetNode(currentNodeID) if !ok { return nil, fmt.Errorf("failed to find node with ID [%s] in execution [%s]", currentNodeID, executionContext.GetID()) diff --git a/flytepropeller/pkg/controller/workflow/executor.go b/flytepropeller/pkg/controller/workflow/executor.go index 13957606d1..d92c0cd939 100644 --- a/flytepropeller/pkg/controller/workflow/executor.go +++ b/flytepropeller/pkg/controller/workflow/executor.go @@ -170,13 +170,22 @@ func (c *workflowExecutor) handleRunningWorkflow(ctx context.Context, w *v1alpha func (c *workflowExecutor) handleFailureNode(ctx context.Context, w *v1alpha1.FlyteWorkflow) (Status, error) { execErr := executionErrorOrDefault(w.GetExecutionStatus().GetExecutionError(), w.GetExecutionStatus().GetMessage()) errorNode := w.GetOnFailureNode() + logger.Infof(ctx, "Handling FailureNode [%v]", errorNode) execcontext := executors.NewExecutionContext(w, w, w, nil, executors.InitializeControlFlow()) - state, err := c.nodeExecutor.RecursiveNodeHandler(ctx, execcontext, w, w, errorNode) + + // TODO: GetNodeExecutionStatus doesn't work. How do we get the error node status from CRD + status := w.GetExecutionStatus().GetNodeExecutionStatus(ctx, errorNode.GetID()) + failureNodeLookup := executors.NewFailureNodeLookup(errorNode.(*v1alpha1.NodeSpec), status) + state, err := c.nodeExecutor.RecursiveNodeHandler(ctx, execcontext, w, failureNodeLookup, errorNode) + logger.Infof(ctx, "FailureNode [%v] finished with state [%v]", errorNode, state) + logger.Infof(ctx, "FailureNode [%v] finished with error [%v]", errorNode, err) if err != nil { + logger.Infof(ctx, "test") return StatusFailureNode(execErr), err } if state.HasFailed() { + logger.Infof(ctx, "test1 [%v]", state.Err) return StatusFailed(state.Err), nil } @@ -187,6 +196,8 @@ func (c *workflowExecutor) handleFailureNode(ctx context.Context, w *v1alpha1.Fl Message: "FailureNode Timed-out"}), nil } + logger.Infof(ctx, "test2") + if state.PartiallyComplete() { // Re-enqueue the workflow c.enqueueWorkflow(w.GetK8sWorkflowID().String()) @@ -220,6 +231,7 @@ func (c *workflowExecutor) handleFailingWorkflow(ctx context.Context, w *v1alpha } errorNode := w.GetOnFailureNode() + logger.Infof(ctx, "Handling xxx FailureNode [%v]", errorNode) if errorNode != nil { return StatusFailureNode(execErr), nil } @@ -282,13 +294,17 @@ func (c *workflowExecutor) TransitionToPhase(ctx context.Context, execID *core.W wfEvent.Phase = core.WorkflowExecution_RUNNING wStatus.UpdatePhase(v1alpha1.WorkflowPhaseRunning, "Workflow Started", nil) wfEvent.OccurredAt = utils.GetProtoTime(wStatus.GetStartedAt()) - case v1alpha1.WorkflowPhaseHandlingFailureNode: - fallthrough case v1alpha1.WorkflowPhaseFailing: wfEvent.Phase = core.WorkflowExecution_FAILING wfEvent.OutputResult = convertToExecutionError(toStatus.Err, previousError) wStatus.UpdatePhase(v1alpha1.WorkflowPhaseFailing, "", wfEvent.GetError()) wfEvent.OccurredAt = utils.GetProtoTime(nil) + case v1alpha1.WorkflowPhaseHandlingFailureNode: + // TODO: Add core.WorkflowPhaseHandlingFailureNode to proto + wfEvent.Phase = core.WorkflowExecution_FAILING + wfEvent.OutputResult = convertToExecutionError(toStatus.Err, previousError) + wStatus.UpdatePhase(v1alpha1.WorkflowPhaseHandlingFailureNode, "", wfEvent.GetError()) + wfEvent.OccurredAt = utils.GetProtoTime(nil) case v1alpha1.WorkflowPhaseFailed: wfEvent.Phase = core.WorkflowExecution_FAILED wfEvent.OutputResult = convertToExecutionError(toStatus.Err, previousError) @@ -422,7 +438,7 @@ func (c *workflowExecutor) HandleFlyteWorkflow(ctx context.Context, w *v1alpha1. case v1alpha1.WorkflowPhaseHandlingFailureNode: newStatus, err := c.handleFailureNode(ctx, w) if err != nil { - return err + return errors.Errorf("failed to handle failure node for workflow [%s], err: [%s]", w.ID, err.Error()) } failureErr := c.TransitionToPhase(ctx, w.ExecutionID.WorkflowExecutionIdentifier, wStatus, newStatus) // Ignore ExecutionNotFound and IncompatibleCluster errors to allow graceful failure From e8b678d3bbf34cdccf35027cffde8d5816daccbd Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 14 Nov 2023 12:08:42 -0800 Subject: [PATCH 02/16] wip Signed-off-by: Kevin Su --- flytepropeller/pkg/controller/nodes/predicate.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flytepropeller/pkg/controller/nodes/predicate.go b/flytepropeller/pkg/controller/nodes/predicate.go index 37fe482267..5ec6c26d41 100644 --- a/flytepropeller/pkg/controller/nodes/predicate.go +++ b/flytepropeller/pkg/controller/nodes/predicate.go @@ -51,7 +51,7 @@ func CanExecute(ctx context.Context, dag executors.DAGStructure, nl executors.No upstreamNodes, err := dag.ToNode(nodeID) if err != nil { - return PredicatePhaseUndefined, errors.Errorf(errors.BadSpecificationError, nodeID, "Unable to find upstream nodes for Node") + return PredicatePhaseUndefined, errors.Errorf(errors.BadSpecificationError, nodeID, "Unable to find upstream nodes for Node {%v}", nodeID) } skipped := false From 8454aa9458a37ace73a45438aafdda21a1fb5bda Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 15 Nov 2023 14:55:01 -0800 Subject: [PATCH 03/16] wip Signed-off-by: Kevin Su --- flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go | 1 + flytepropeller/pkg/compiler/transformers/k8s/workflow.go | 1 + flytepropeller/pkg/compiler/workflow_compiler.go | 5 ++++- flytepropeller/pkg/controller/nodes/executor.go | 1 + 4 files changed, 7 insertions(+), 1 deletion(-) diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go index b3d744bd77..e345801c9d 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go @@ -46,6 +46,7 @@ const ( NodeKindWorkflow NodeKind = "workflow" // Either an inline workflow or a remote workflow definition NodeKindGate NodeKind = "gate" // A Gate node with a condition NodeKindArray NodeKind = "array" // An array node with a subtask Node + NodeKindFailure NodeKind = "failure" // A failure node with a subtask Node NodeKindStart NodeKind = "start" // Start node is a special node NodeKindEnd NodeKind = "end" ) diff --git a/flytepropeller/pkg/compiler/transformers/k8s/workflow.go b/flytepropeller/pkg/compiler/transformers/k8s/workflow.go index 2421ddf9bb..e0541ed765 100644 --- a/flytepropeller/pkg/compiler/transformers/k8s/workflow.go +++ b/flytepropeller/pkg/compiler/transformers/k8s/workflow.go @@ -92,6 +92,7 @@ func buildFlyteWorkflowSpec(wf *core.CompiledWorkflow, tasks []*core.CompiledTas return nil, errs } failureN = nodes[0] + // failureN.Kind = v1alpha1.NodeKindFailure } nodes, _ := buildNodes(wf.Template.GetNodes(), tasks, errs.NewScope()) diff --git a/flytepropeller/pkg/compiler/workflow_compiler.go b/flytepropeller/pkg/compiler/workflow_compiler.go index 07fc7bf938..25e2ebf437 100644 --- a/flytepropeller/pkg/compiler/workflow_compiler.go +++ b/flytepropeller/pkg/compiler/workflow_compiler.go @@ -187,8 +187,11 @@ func (w workflowBuilder) ValidateWorkflow(fg *flyteWorkflow, errs errors.Compile return nil, !errs.HasErrors() } - checkpoint := make([]*core.Node, 0, len(fg.Template.Nodes)) + checkpoint := make([]*core.Node, 0, len(fg.Template.Nodes)) // nodes in the workflow + failure node checkpoint = append(checkpoint, fg.Template.Nodes...) + //if fg.Template.FailureNode != nil { + // checkpoint = append(checkpoint, fg.Template.FailureNode) + //} fg.Template.Nodes = make([]*core.Node, 0, len(fg.Template.Nodes)) wf.GetCoreWorkflow().Connections = &core.ConnectionSet{ Downstream: make(map[string]*core.ConnectionSet_IdList), diff --git a/flytepropeller/pkg/controller/nodes/executor.go b/flytepropeller/pkg/controller/nodes/executor.go index bd96edcd5f..e2f606541d 100644 --- a/flytepropeller/pkg/controller/nodes/executor.go +++ b/flytepropeller/pkg/controller/nodes/executor.go @@ -266,6 +266,7 @@ func (c *recursiveNodeExecutor) handleDownstream(ctx context.Context, execContex Kind: core.ExecutionError_SYSTEM, }), nil } + logger.Debugf(ctx, "Downstream nodes [%v]", downstreamNodes) if len(downstreamNodes) == 0 { logger.Debugf(ctx, "No downstream nodes found. Complete.") return interfaces.NodeStatusComplete, nil From 97979d9d0ce44d3d1b97838338170d8f6001bca0 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 15 Nov 2023 15:19:36 -0800 Subject: [PATCH 04/16] wip Signed-off-by: Kevin Su --- .../pkg/compiler/transformers/k8s/workflow.go | 10 ++++++ .../executors/failure_node_lookup.go | 34 +++++++++++++++++++ 2 files changed, 44 insertions(+) create mode 100644 flytepropeller/pkg/controller/executors/failure_node_lookup.go diff --git a/flytepropeller/pkg/compiler/transformers/k8s/workflow.go b/flytepropeller/pkg/compiler/transformers/k8s/workflow.go index e0541ed765..777c320f90 100644 --- a/flytepropeller/pkg/compiler/transformers/k8s/workflow.go +++ b/flytepropeller/pkg/compiler/transformers/k8s/workflow.go @@ -87,6 +87,16 @@ func buildFlyteWorkflowSpec(wf *core.CompiledWorkflow, tasks []*core.CompiledTas var failureN *v1alpha1.NodeSpec if n := wf.Template.GetFailureNode(); n != nil { + for _, in := range n.Inputs { + in.Binding = &core.BindingData{ + Value: &core.BindingData_Promise{ + Promise: &core.OutputReference{ + Var: in.Binding.GetPromise().GetVar(), + NodeId: v1alpha1.StartNodeID, + }, + }, + } + } nodes, ok := buildNodeSpec(n, tasks, errs.NewScope()) if !ok { return nil, errs diff --git a/flytepropeller/pkg/controller/executors/failure_node_lookup.go b/flytepropeller/pkg/controller/executors/failure_node_lookup.go new file mode 100644 index 0000000000..e169f6fc27 --- /dev/null +++ b/flytepropeller/pkg/controller/executors/failure_node_lookup.go @@ -0,0 +1,34 @@ +package executors + +import ( + "context" + "github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" +) + +type FailureNodeLookup struct { + NodeSpec *v1alpha1.NodeSpec + NodeStatus v1alpha1.ExecutableNodeStatus +} + +func (f FailureNodeLookup) GetNode(nodeID v1alpha1.NodeID) (v1alpha1.ExecutableNode, bool) { + return f.NodeSpec, true +} + +func (f FailureNodeLookup) GetNodeExecutionStatus(ctx context.Context, id v1alpha1.NodeID) v1alpha1.ExecutableNodeStatus { + return f.NodeStatus +} + +func (f FailureNodeLookup) ToNode(id v1alpha1.NodeID) ([]v1alpha1.NodeID, error) { + return []v1alpha1.NodeID{v1alpha1.StartNodeID}, nil +} + +func (f FailureNodeLookup) FromNode(id v1alpha1.NodeID) ([]v1alpha1.NodeID, error) { + return nil, nil +} + +func NewFailureNodeLookup(nodeSpec *v1alpha1.NodeSpec, nodeStatus v1alpha1.ExecutableNodeStatus) NodeLookup { + return FailureNodeLookup{ + NodeSpec: nodeSpec, + NodeStatus: nodeStatus, + } +} From 673079771197062ecd0c97aabb40f0035ef19d17 Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Wed, 15 Nov 2023 15:32:03 -0800 Subject: [PATCH 05/16] trying to make it work! Signed-off-by: Ketan Umare --- .../executors/failure_node_lookup.go | 22 ++++++++++++++----- .../pkg/controller/workflow/executor.go | 3 +-- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/flytepropeller/pkg/controller/executors/failure_node_lookup.go b/flytepropeller/pkg/controller/executors/failure_node_lookup.go index e169f6fc27..15777d5582 100644 --- a/flytepropeller/pkg/controller/executors/failure_node_lookup.go +++ b/flytepropeller/pkg/controller/executors/failure_node_lookup.go @@ -6,15 +6,23 @@ import ( ) type FailureNodeLookup struct { - NodeSpec *v1alpha1.NodeSpec - NodeStatus v1alpha1.ExecutableNodeStatus + NodeSpec *v1alpha1.NodeSpec + NodeStatus v1alpha1.ExecutableNodeStatus + StartNode v1alpha1.ExecutableNode + StartNodeStatus v1alpha1.ExecutableNodeStatus } func (f FailureNodeLookup) GetNode(nodeID v1alpha1.NodeID) (v1alpha1.ExecutableNode, bool) { + if nodeID == v1alpha1.StartNodeID { + return f.StartNode, true + } return f.NodeSpec, true } func (f FailureNodeLookup) GetNodeExecutionStatus(ctx context.Context, id v1alpha1.NodeID) v1alpha1.ExecutableNodeStatus { + if id == v1alpha1.StartNodeID { + return f.StartNodeStatus + } return f.NodeStatus } @@ -26,9 +34,13 @@ func (f FailureNodeLookup) FromNode(id v1alpha1.NodeID) ([]v1alpha1.NodeID, erro return nil, nil } -func NewFailureNodeLookup(nodeSpec *v1alpha1.NodeSpec, nodeStatus v1alpha1.ExecutableNodeStatus) NodeLookup { +func NewFailureNodeLookup(nodeSpec *v1alpha1.NodeSpec, startNode v1alpha1.ExecutableNode, nodeStatusGetter v1alpha1.NodeStatusGetter) NodeLookup { + startNodeStatus := nodeStatusGetter.GetNodeExecutionStatus(context.TODO(), v1alpha1.StartNodeID) + errNodeStatus := nodeStatusGetter.GetNodeExecutionStatus(context.TODO(), nodeSpec.GetID()) return FailureNodeLookup{ - NodeSpec: nodeSpec, - NodeStatus: nodeStatus, + NodeSpec: nodeSpec, + NodeStatus: errNodeStatus, + StartNode: startNode, + StartNodeStatus: startNodeStatus, } } diff --git a/flytepropeller/pkg/controller/workflow/executor.go b/flytepropeller/pkg/controller/workflow/executor.go index d92c0cd939..f782c30d0a 100644 --- a/flytepropeller/pkg/controller/workflow/executor.go +++ b/flytepropeller/pkg/controller/workflow/executor.go @@ -174,8 +174,7 @@ func (c *workflowExecutor) handleFailureNode(ctx context.Context, w *v1alpha1.Fl execcontext := executors.NewExecutionContext(w, w, w, nil, executors.InitializeControlFlow()) // TODO: GetNodeExecutionStatus doesn't work. How do we get the error node status from CRD - status := w.GetExecutionStatus().GetNodeExecutionStatus(ctx, errorNode.GetID()) - failureNodeLookup := executors.NewFailureNodeLookup(errorNode.(*v1alpha1.NodeSpec), status) + failureNodeLookup := executors.NewFailureNodeLookup(errorNode.(*v1alpha1.NodeSpec), w.GetNode(v1alpha1.StartNodeID), w.GetExecutionStatus()) state, err := c.nodeExecutor.RecursiveNodeHandler(ctx, execcontext, w, failureNodeLookup, errorNode) logger.Infof(ctx, "FailureNode [%v] finished with state [%v]", errorNode, state) logger.Infof(ctx, "FailureNode [%v] finished with error [%v]", errorNode, err) From 3fb61f388115e3a2add975e6aa644f434a05b00e Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 15 Nov 2023 19:20:29 -0800 Subject: [PATCH 06/16] wip Signed-off-by: Kevin Su --- .../executors/failure_node_lookup.go | 29 +++++++++---------- .../nodes/subworkflow/subworkflow.go | 10 +++++-- .../pkg/controller/workflow/executor.go | 16 ++++------ 3 files changed, 26 insertions(+), 29 deletions(-) diff --git a/flytepropeller/pkg/controller/executors/failure_node_lookup.go b/flytepropeller/pkg/controller/executors/failure_node_lookup.go index 15777d5582..a517ba7ead 100644 --- a/flytepropeller/pkg/controller/executors/failure_node_lookup.go +++ b/flytepropeller/pkg/controller/executors/failure_node_lookup.go @@ -6,41 +6,38 @@ import ( ) type FailureNodeLookup struct { - NodeSpec *v1alpha1.NodeSpec - NodeStatus v1alpha1.ExecutableNodeStatus - StartNode v1alpha1.ExecutableNode - StartNodeStatus v1alpha1.ExecutableNodeStatus + NodeLookup + FailureNode v1alpha1.ExecutableNode + FailureNodeStatus v1alpha1.ExecutableNodeStatus } func (f FailureNodeLookup) GetNode(nodeID v1alpha1.NodeID) (v1alpha1.ExecutableNode, bool) { if nodeID == v1alpha1.StartNodeID { - return f.StartNode, true + return f.NodeLookup.GetNode(nodeID) } - return f.NodeSpec, true + return f.FailureNode, true } func (f FailureNodeLookup) GetNodeExecutionStatus(ctx context.Context, id v1alpha1.NodeID) v1alpha1.ExecutableNodeStatus { if id == v1alpha1.StartNodeID { - return f.StartNodeStatus + return f.NodeLookup.GetNodeExecutionStatus(ctx, id) } - return f.NodeStatus + return f.FailureNodeStatus } func (f FailureNodeLookup) ToNode(id v1alpha1.NodeID) ([]v1alpha1.NodeID, error) { + // The upstream node of the failure node is always the start node return []v1alpha1.NodeID{v1alpha1.StartNodeID}, nil } func (f FailureNodeLookup) FromNode(id v1alpha1.NodeID) ([]v1alpha1.NodeID, error) { - return nil, nil + return []v1alpha1.NodeID{v1alpha1.EndNodeID}, nil } -func NewFailureNodeLookup(nodeSpec *v1alpha1.NodeSpec, startNode v1alpha1.ExecutableNode, nodeStatusGetter v1alpha1.NodeStatusGetter) NodeLookup { - startNodeStatus := nodeStatusGetter.GetNodeExecutionStatus(context.TODO(), v1alpha1.StartNodeID) - errNodeStatus := nodeStatusGetter.GetNodeExecutionStatus(context.TODO(), nodeSpec.GetID()) +func NewFailureNodeLookup(nodeLookup NodeLookup, failureNode v1alpha1.ExecutableNode, failureNodeStatus v1alpha1.ExecutableNodeStatus) NodeLookup { return FailureNodeLookup{ - NodeSpec: nodeSpec, - NodeStatus: errNodeStatus, - StartNode: startNode, - StartNodeStatus: startNodeStatus, + NodeLookup: nodeLookup, + FailureNode: failureNode, + FailureNodeStatus: failureNodeStatus, } } diff --git a/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go b/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go index 10e48358dd..6ea22b9637 100644 --- a/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go +++ b/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go @@ -143,12 +143,18 @@ func (s *subworkflowHandler) getExecutionContextForDownstream(nCtx interfaces.No func (s *subworkflowHandler) HandleFailureNodeOfSubWorkflow(ctx context.Context, nCtx interfaces.NodeExecutionContext, subworkflow v1alpha1.ExecutableSubWorkflow, nl executors.NodeLookup) (handler.Transition, error) { originalError := nCtx.NodeStateReader().GetWorkflowNodeState().Error - if subworkflow.GetOnFailureNode() != nil { + failureNode := subworkflow.GetOnFailureNode() + if failureNode != nil { execContext, err := s.getExecutionContextForDownstream(nCtx) if err != nil { return handler.UnknownTransition, err } - state, err := s.nodeExecutor.RecursiveNodeHandler(ctx, execContext, subworkflow, nl, subworkflow.GetOnFailureNode()) + subNodeLookup := nCtx.ContextualNodeLookup() + // TODO: NodeStatus() is deprecated, how do we get the status of the failure node? + failureNodeStatus := nCtx.NodeStatus().GetNodeExecutionStatus(ctx, failureNode.GetID()) + failureNodeLookup := executors.NewFailureNodeLookup(subNodeLookup, failureNode, failureNodeStatus) + + state, err := s.nodeExecutor.RecursiveNodeHandler(ctx, execContext, failureNodeLookup, failureNodeLookup, failureNode) if err != nil { return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoUndefined), err } diff --git a/flytepropeller/pkg/controller/workflow/executor.go b/flytepropeller/pkg/controller/workflow/executor.go index f782c30d0a..40fb31e46d 100644 --- a/flytepropeller/pkg/controller/workflow/executor.go +++ b/flytepropeller/pkg/controller/workflow/executor.go @@ -169,22 +169,18 @@ func (c *workflowExecutor) handleRunningWorkflow(ctx context.Context, w *v1alpha func (c *workflowExecutor) handleFailureNode(ctx context.Context, w *v1alpha1.FlyteWorkflow) (Status, error) { execErr := executionErrorOrDefault(w.GetExecutionStatus().GetExecutionError(), w.GetExecutionStatus().GetMessage()) - errorNode := w.GetOnFailureNode() - logger.Infof(ctx, "Handling FailureNode [%v]", errorNode) + failureNode := w.GetOnFailureNode() + logger.Infof(ctx, "Handling FailureNode [%v]", failureNode.GetID()) execcontext := executors.NewExecutionContext(w, w, w, nil, executors.InitializeControlFlow()) - // TODO: GetNodeExecutionStatus doesn't work. How do we get the error node status from CRD - failureNodeLookup := executors.NewFailureNodeLookup(errorNode.(*v1alpha1.NodeSpec), w.GetNode(v1alpha1.StartNodeID), w.GetExecutionStatus()) - state, err := c.nodeExecutor.RecursiveNodeHandler(ctx, execcontext, w, failureNodeLookup, errorNode) - logger.Infof(ctx, "FailureNode [%v] finished with state [%v]", errorNode, state) - logger.Infof(ctx, "FailureNode [%v] finished with error [%v]", errorNode, err) + failureNodeStatus := w.GetExecutionStatus().GetNodeExecutionStatus(ctx, failureNode.GetID()) + failureNodeLookup := executors.NewFailureNodeLookup(w, failureNode, failureNodeStatus) + state, err := c.nodeExecutor.RecursiveNodeHandler(ctx, execcontext, failureNodeLookup, failureNodeLookup, failureNode) if err != nil { - logger.Infof(ctx, "test") return StatusFailureNode(execErr), err } if state.HasFailed() { - logger.Infof(ctx, "test1 [%v]", state.Err) return StatusFailed(state.Err), nil } @@ -195,8 +191,6 @@ func (c *workflowExecutor) handleFailureNode(ctx context.Context, w *v1alpha1.Fl Message: "FailureNode Timed-out"}), nil } - logger.Infof(ctx, "test2") - if state.PartiallyComplete() { // Re-enqueue the workflow c.enqueueWorkflow(w.GetK8sWorkflowID().String()) From 5d0c2b2f4dc5ead2e56d66362fa6a631c847bcbb Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 15 Nov 2023 19:34:45 -0800 Subject: [PATCH 07/16] wip Signed-off-by: Kevin Su --- .../pkg/controller/workflow/executor.go | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/flytepropeller/pkg/controller/workflow/executor.go b/flytepropeller/pkg/controller/workflow/executor.go index f782c30d0a..4a48a3d7f0 100644 --- a/flytepropeller/pkg/controller/workflow/executor.go +++ b/flytepropeller/pkg/controller/workflow/executor.go @@ -174,8 +174,9 @@ func (c *workflowExecutor) handleFailureNode(ctx context.Context, w *v1alpha1.Fl execcontext := executors.NewExecutionContext(w, w, w, nil, executors.InitializeControlFlow()) // TODO: GetNodeExecutionStatus doesn't work. How do we get the error node status from CRD - failureNodeLookup := executors.NewFailureNodeLookup(errorNode.(*v1alpha1.NodeSpec), w.GetNode(v1alpha1.StartNodeID), w.GetExecutionStatus()) - state, err := c.nodeExecutor.RecursiveNodeHandler(ctx, execcontext, w, failureNodeLookup, errorNode) + startNode, _ := w.GetNode(v1alpha1.StartNodeID) + failureNodeLookup := executors.NewFailureNodeLookup(errorNode.(*v1alpha1.NodeSpec), startNode, w.GetExecutionStatus()) + state, err := c.nodeExecutor.RecursiveNodeHandler(ctx, execcontext, failureNodeLookup, failureNodeLookup, errorNode) logger.Infof(ctx, "FailureNode [%v] finished with state [%v]", errorNode, state) logger.Infof(ctx, "FailureNode [%v] finished with error [%v]", errorNode, err) if err != nil { @@ -183,26 +184,26 @@ func (c *workflowExecutor) handleFailureNode(ctx context.Context, w *v1alpha1.Fl return StatusFailureNode(execErr), err } - if state.HasFailed() { - logger.Infof(ctx, "test1 [%v]", state.Err) + switch state.NodePhase { + case interfaces.NodePhaseFailed: return StatusFailed(state.Err), nil - } - - if state.HasTimedOut() { + case interfaces.NodePhaseTimedOut: return StatusFailed(&core.ExecutionError{ Kind: core.ExecutionError_USER, Code: "TimedOut", Message: "FailureNode Timed-out"}), nil - } - - logger.Infof(ctx, "test2") - - if state.PartiallyComplete() { + case interfaces.NodePhaseQueued: + fallthrough + case interfaces.NodePhaseRunning: + fallthrough + case interfaces.NodePhaseSuccess: // Re-enqueue the workflow c.enqueueWorkflow(w.GetK8sWorkflowID().String()) return StatusFailureNode(execErr), nil } + logger.Infof(ctx, "test2") + // If the failure node finished executing, transition to failed. return StatusFailed(execErr), nil } From 65cfaba5e87e98ae7f6343c05ff90f56c7100d11 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 15 Nov 2023 19:46:35 -0800 Subject: [PATCH 08/16] lint Signed-off-by: Kevin Su --- flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go | 1 - flytepropeller/pkg/apis/flyteworkflow/v1alpha1/workflow.go | 3 --- flytepropeller/pkg/compiler/transformers/k8s/node.go | 4 ---- flytepropeller/pkg/compiler/transformers/k8s/workflow.go | 1 - flytepropeller/pkg/compiler/workflow_compiler.go | 5 +---- .../pkg/controller/executors/failure_node_lookup.go | 2 +- flytepropeller/pkg/controller/executors/node_lookup.go | 3 --- flytepropeller/pkg/controller/nodes/executor.go | 3 --- flytepropeller/pkg/controller/nodes/node_exec_context.go | 1 - flytepropeller/pkg/controller/workflow/executor.go | 3 +-- 10 files changed, 3 insertions(+), 23 deletions(-) diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go index e345801c9d..b3d744bd77 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go @@ -46,7 +46,6 @@ const ( NodeKindWorkflow NodeKind = "workflow" // Either an inline workflow or a remote workflow definition NodeKindGate NodeKind = "gate" // A Gate node with a condition NodeKindArray NodeKind = "array" // An array node with a subtask Node - NodeKindFailure NodeKind = "failure" // A failure node with a subtask Node NodeKindStart NodeKind = "start" // Start node is a special node NodeKindEnd NodeKind = "end" ) diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/workflow.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/workflow.go index d86ac02340..3216c4c614 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/workflow.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/workflow.go @@ -4,8 +4,6 @@ import ( "bytes" "context" "encoding/json" - "fmt" - "github.com/golang/protobuf/jsonpb" "github.com/pkg/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -332,7 +330,6 @@ func (in *WorkflowSpec) GetOutputs() *OutputVarMap { } func (in *WorkflowSpec) GetNode(nodeID NodeID) (ExecutableNode, bool) { - fmt.Print("Getting node ", nodeID, " from ", in.Nodes) n, ok := in.Nodes[nodeID] return n, ok } diff --git a/flytepropeller/pkg/compiler/transformers/k8s/node.go b/flytepropeller/pkg/compiler/transformers/k8s/node.go index 26de17f346..2ac06ebd89 100644 --- a/flytepropeller/pkg/compiler/transformers/k8s/node.go +++ b/flytepropeller/pkg/compiler/transformers/k8s/node.go @@ -1,8 +1,6 @@ package k8s import ( - "context" - "github.com/flyteorg/flyte/flytestdlib/logger" "strings" "github.com/go-test/deep" @@ -35,9 +33,7 @@ func buildNodeSpec(n *core.Node, tasks []*core.CompiledTask, errs errors.Compile if n.GetTaskNode() != nil { taskID := n.GetTaskNode().GetReferenceId().String() // TODO: Use task index for quick lookup - logger.Info(context.Background(), "kevin Looking up task", "taskID", taskID) for _, t := range tasks { - logger.Infof(context.Background(), "kevin Comparing %v with %v", t.Template.Id.String(), taskID) if t.Template.Id.String() == taskID { task = t.Template break diff --git a/flytepropeller/pkg/compiler/transformers/k8s/workflow.go b/flytepropeller/pkg/compiler/transformers/k8s/workflow.go index 777c320f90..a9ff8b83d7 100644 --- a/flytepropeller/pkg/compiler/transformers/k8s/workflow.go +++ b/flytepropeller/pkg/compiler/transformers/k8s/workflow.go @@ -102,7 +102,6 @@ func buildFlyteWorkflowSpec(wf *core.CompiledWorkflow, tasks []*core.CompiledTas return nil, errs } failureN = nodes[0] - // failureN.Kind = v1alpha1.NodeKindFailure } nodes, _ := buildNodes(wf.Template.GetNodes(), tasks, errs.NewScope()) diff --git a/flytepropeller/pkg/compiler/workflow_compiler.go b/flytepropeller/pkg/compiler/workflow_compiler.go index 25e2ebf437..07fc7bf938 100644 --- a/flytepropeller/pkg/compiler/workflow_compiler.go +++ b/flytepropeller/pkg/compiler/workflow_compiler.go @@ -187,11 +187,8 @@ func (w workflowBuilder) ValidateWorkflow(fg *flyteWorkflow, errs errors.Compile return nil, !errs.HasErrors() } - checkpoint := make([]*core.Node, 0, len(fg.Template.Nodes)) // nodes in the workflow + failure node + checkpoint := make([]*core.Node, 0, len(fg.Template.Nodes)) checkpoint = append(checkpoint, fg.Template.Nodes...) - //if fg.Template.FailureNode != nil { - // checkpoint = append(checkpoint, fg.Template.FailureNode) - //} fg.Template.Nodes = make([]*core.Node, 0, len(fg.Template.Nodes)) wf.GetCoreWorkflow().Connections = &core.ConnectionSet{ Downstream: make(map[string]*core.ConnectionSet_IdList), diff --git a/flytepropeller/pkg/controller/executors/failure_node_lookup.go b/flytepropeller/pkg/controller/executors/failure_node_lookup.go index a517ba7ead..12aa394ea8 100644 --- a/flytepropeller/pkg/controller/executors/failure_node_lookup.go +++ b/flytepropeller/pkg/controller/executors/failure_node_lookup.go @@ -31,7 +31,7 @@ func (f FailureNodeLookup) ToNode(id v1alpha1.NodeID) ([]v1alpha1.NodeID, error) } func (f FailureNodeLookup) FromNode(id v1alpha1.NodeID) ([]v1alpha1.NodeID, error) { - return []v1alpha1.NodeID{v1alpha1.EndNodeID}, nil + return nil, nil } func NewFailureNodeLookup(nodeLookup NodeLookup, failureNode v1alpha1.ExecutableNode, failureNodeStatus v1alpha1.ExecutableNodeStatus) NodeLookup { diff --git a/flytepropeller/pkg/controller/executors/node_lookup.go b/flytepropeller/pkg/controller/executors/node_lookup.go index 2e82f5920e..6079c80f60 100644 --- a/flytepropeller/pkg/controller/executors/node_lookup.go +++ b/flytepropeller/pkg/controller/executors/node_lookup.go @@ -2,8 +2,6 @@ package executors import ( "context" - "fmt" - "github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" ) @@ -47,7 +45,6 @@ type staticNodeLookup struct { } func (s staticNodeLookup) GetNode(nodeID v1alpha1.NodeID) (v1alpha1.ExecutableNode, bool) { - fmt.Print("staticNodeLookup.GetNode") n, ok := s.nodes[nodeID] return n, ok } diff --git a/flytepropeller/pkg/controller/nodes/executor.go b/flytepropeller/pkg/controller/nodes/executor.go index e2f606541d..24fac268d8 100644 --- a/flytepropeller/pkg/controller/nodes/executor.go +++ b/flytepropeller/pkg/controller/nodes/executor.go @@ -180,7 +180,6 @@ func (c *recursiveNodeExecutor) RecursiveNodeHandler(ctx context.Context, execCo currentNodeCtx := contextutils.WithNodeID(ctx, currentNode.GetID()) nodeStatus := nl.GetNodeExecutionStatus(ctx, currentNode.GetID()) nodePhase := nodeStatus.GetPhase() - logger.Infof(currentNodeCtx, "Handling node [%v] Status [%v]", currentNode.GetID(), nodePhase.String()) if canHandleNode(nodePhase) { // TODO Follow up Pull Request, @@ -266,7 +265,6 @@ func (c *recursiveNodeExecutor) handleDownstream(ctx context.Context, execContex Kind: core.ExecutionError_SYSTEM, }), nil } - logger.Debugf(ctx, "Downstream nodes [%v]", downstreamNodes) if len(downstreamNodes) == 0 { logger.Debugf(ctx, "No downstream nodes found. Complete.") return interfaces.NodeStatusComplete, nil @@ -991,7 +989,6 @@ func (c *nodeExecutor) handleNotYetStartedNode(ctx context.Context, dag executor if np != nodeStatus.GetPhase() { // assert np == Queued! - logger.Infof(ctx, "Change in node state detected1 from [%s] -> [%s]", nodeStatus.GetPhase().String(), np.String()) p = p.WithOccuredAt(occurredAt) nev, err := ToNodeExecutionEvent(nCtx.NodeExecutionMetadata().GetNodeExecutionID(), diff --git a/flytepropeller/pkg/controller/nodes/node_exec_context.go b/flytepropeller/pkg/controller/nodes/node_exec_context.go index d3029f954f..f42f8b0324 100644 --- a/flytepropeller/pkg/controller/nodes/node_exec_context.go +++ b/flytepropeller/pkg/controller/nodes/node_exec_context.go @@ -261,7 +261,6 @@ func isAboveInterruptibleFailureThreshold(numFailures uint32, maxAttempts uint32 func (c *nodeExecutor) BuildNodeExecutionContext(ctx context.Context, executionContext executors.ExecutionContext, nl executors.NodeLookup, currentNodeID v1alpha1.NodeID) (interfaces.NodeExecutionContext, error) { - fmt.Printf("-------------- BuildNodeExecutionContext for node [%v] in execution [%v]\n", currentNodeID, executionContext.GetID()) n, ok := nl.GetNode(currentNodeID) if !ok { return nil, fmt.Errorf("failed to find node with ID [%s] in execution [%s]", currentNodeID, executionContext.GetID()) diff --git a/flytepropeller/pkg/controller/workflow/executor.go b/flytepropeller/pkg/controller/workflow/executor.go index 2b0b850905..c88f655673 100644 --- a/flytepropeller/pkg/controller/workflow/executor.go +++ b/flytepropeller/pkg/controller/workflow/executor.go @@ -225,7 +225,6 @@ func (c *workflowExecutor) handleFailingWorkflow(ctx context.Context, w *v1alpha } errorNode := w.GetOnFailureNode() - logger.Infof(ctx, "Handling xxx FailureNode [%v]", errorNode) if errorNode != nil { return StatusFailureNode(execErr), nil } @@ -294,7 +293,7 @@ func (c *workflowExecutor) TransitionToPhase(ctx context.Context, execID *core.W wStatus.UpdatePhase(v1alpha1.WorkflowPhaseFailing, "", wfEvent.GetError()) wfEvent.OccurredAt = utils.GetProtoTime(nil) case v1alpha1.WorkflowPhaseHandlingFailureNode: - // TODO: Add core.WorkflowPhaseHandlingFailureNode to proto + // TODO: Add core.WorkflowPhaseHandlingFailureNode to idl? wfEvent.Phase = core.WorkflowExecution_FAILING wfEvent.OutputResult = convertToExecutionError(toStatus.Err, previousError) wStatus.UpdatePhase(v1alpha1.WorkflowPhaseHandlingFailureNode, "", wfEvent.GetError()) From 1a3d6f28be1dd27f07fce867dc21a5779c3be301 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 15 Nov 2023 19:54:38 -0800 Subject: [PATCH 09/16] lint Signed-off-by: Kevin Su --- flytepropeller/pkg/apis/flyteworkflow/v1alpha1/workflow.go | 1 + flytepropeller/pkg/controller/executors/node_lookup.go | 1 + flytepropeller/pkg/controller/nodes/executor.go | 1 + 3 files changed, 3 insertions(+) diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/workflow.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/workflow.go index 3216c4c614..225a49ac3f 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/workflow.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/workflow.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/json" + "github.com/golang/protobuf/jsonpb" "github.com/pkg/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" diff --git a/flytepropeller/pkg/controller/executors/node_lookup.go b/flytepropeller/pkg/controller/executors/node_lookup.go index 6079c80f60..0a714ab4e7 100644 --- a/flytepropeller/pkg/controller/executors/node_lookup.go +++ b/flytepropeller/pkg/controller/executors/node_lookup.go @@ -2,6 +2,7 @@ package executors import ( "context" + "github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" ) diff --git a/flytepropeller/pkg/controller/nodes/executor.go b/flytepropeller/pkg/controller/nodes/executor.go index 24fac268d8..b9d727db7e 100644 --- a/flytepropeller/pkg/controller/nodes/executor.go +++ b/flytepropeller/pkg/controller/nodes/executor.go @@ -989,6 +989,7 @@ func (c *nodeExecutor) handleNotYetStartedNode(ctx context.Context, dag executor if np != nodeStatus.GetPhase() { // assert np == Queued! + logger.Infof(ctx, "Change in node state detected from [%s] -> [%s]", nodeStatus.GetPhase().String(), np.String()) p = p.WithOccuredAt(occurredAt) nev, err := ToNodeExecutionEvent(nCtx.NodeExecutionMetadata().GetNodeExecutionID(), From 040745e6cc2cec4646817ffb5e7a84679f34512a Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 16 Nov 2023 17:06:54 -0800 Subject: [PATCH 10/16] more tests Signed-off-by: Kevin Su --- .../executors/failure_node_lookup.go | 1 + .../executors/failure_node_lookup_test.go | 57 +++++++++++++++++++ .../nodes/subworkflow/subworkflow_test.go | 30 ++++++++++ .../pkg/controller/workflow/executor.go | 4 +- .../pkg/controller/workflow/executor_test.go | 4 +- .../workflow/testdata/benchmark_wf.yaml | 50 ++++++++++++++++ 6 files changed, 143 insertions(+), 3 deletions(-) create mode 100644 flytepropeller/pkg/controller/executors/failure_node_lookup_test.go diff --git a/flytepropeller/pkg/controller/executors/failure_node_lookup.go b/flytepropeller/pkg/controller/executors/failure_node_lookup.go index 12aa394ea8..0c61540259 100644 --- a/flytepropeller/pkg/controller/executors/failure_node_lookup.go +++ b/flytepropeller/pkg/controller/executors/failure_node_lookup.go @@ -2,6 +2,7 @@ package executors import ( "context" + "github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" ) diff --git a/flytepropeller/pkg/controller/executors/failure_node_lookup_test.go b/flytepropeller/pkg/controller/executors/failure_node_lookup_test.go new file mode 100644 index 0000000000..27193603d3 --- /dev/null +++ b/flytepropeller/pkg/controller/executors/failure_node_lookup_test.go @@ -0,0 +1,57 @@ +package executors + +import ( + "context" + "testing" + + "github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" + "github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks" + "github.com/stretchr/testify/assert" +) + +type nl struct { + NodeLookup +} + +type en struct { + v1alpha1.ExecutableNode +} + +type ns struct { + v1alpha1.ExecutableNodeStatus +} + +func TestNewFailureNodeLookup(t *testing.T) { + nl := nl{} + en := en{} + ns := ns{} + nodeLoopUp := NewFailureNodeLookup(nl, en, ns) + assert.NotNil(t, nl) + typed := nodeLoopUp.(FailureNodeLookup) + assert.Equal(t, nl, typed.NodeLookup) + assert.Equal(t, en, typed.FailureNode) + assert.Equal(t, ns, typed.FailureNodeStatus) +} + +func TestNewTestFailureNodeLookup(t *testing.T) { + n := &mocks.ExecutableNode{} + ns := &mocks.ExecutableNodeStatus{} + failureNodeID := "fn1" + nl := NewTestNodeLookup( + map[string]v1alpha1.ExecutableNode{v1alpha1.StartNodeID: n, failureNodeID: n}, + map[string]v1alpha1.ExecutableNodeStatus{v1alpha1.StartNodeID: ns, failureNodeID: ns}, + ) + + assert.NotNil(t, nl) + + failureNodeLookup := NewFailureNodeLookup(nl, n, ns) + r, ok := failureNodeLookup.GetNode(v1alpha1.StartNodeID) + assert.True(t, ok) + assert.Equal(t, n, r) + assert.Equal(t, ns, nl.GetNodeExecutionStatus(context.TODO(), v1alpha1.StartNodeID)) + + r, ok = failureNodeLookup.GetNode(failureNodeID) + assert.True(t, ok) + assert.Equal(t, n, r) + assert.Equal(t, ns, nl.GetNodeExecutionStatus(context.TODO(), failureNodeID)) +} diff --git a/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow_test.go b/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow_test.go index f2391048af..dc00efacdc 100644 --- a/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow_test.go +++ b/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow_test.go @@ -41,6 +41,36 @@ func TestGetSubWorkflow(t *testing.T) { assert.Equal(t, swf, w) }) + t.Run("subworkflow with failure node", func(t *testing.T) { + + wfNode := &coreMocks.ExecutableWorkflowNode{} + x := "x" + wfNode.OnGetSubWorkflowRef().Return(&x) + + node := &coreMocks.ExecutableNode{} + node.OnGetWorkflowNode().Return(wfNode) + + ectx := &execMocks.ExecutionContext{} + + wfFailureNode := &coreMocks.ExecutableWorkflowNode{} + y := "y" + wfFailureNode.OnGetSubWorkflowRef().Return(&y) + failureNode := &coreMocks.ExecutableNode{} + failureNode.OnGetWorkflowNode().Return(wfFailureNode) + + swf := &coreMocks.ExecutableSubWorkflow{} + swf.OnGetOnFailureNode().Return(failureNode) + ectx.OnFindSubWorkflow("x").Return(swf) + + nCtx := &mocks.NodeExecutionContext{} + nCtx.OnNode().Return(node) + nCtx.OnExecutionContext().Return(ectx) + + w, err := GetSubWorkflow(ctx, nCtx) + assert.NoError(t, err) + assert.Equal(t, swf, w) + }) + t.Run("missing-subworkflow", func(t *testing.T) { wfNode := &coreMocks.ExecutableWorkflowNode{} diff --git a/flytepropeller/pkg/controller/workflow/executor.go b/flytepropeller/pkg/controller/workflow/executor.go index c88f655673..d3cbfb71b3 100644 --- a/flytepropeller/pkg/controller/workflow/executor.go +++ b/flytepropeller/pkg/controller/workflow/executor.go @@ -224,8 +224,8 @@ func (c *workflowExecutor) handleFailingWorkflow(ctx context.Context, w *v1alpha return StatusFailing(execErr), err } - errorNode := w.GetOnFailureNode() - if errorNode != nil { + failureNode := w.GetOnFailureNode() + if failureNode != nil { return StatusFailureNode(execErr), nil } diff --git a/flytepropeller/pkg/controller/workflow/executor_test.go b/flytepropeller/pkg/controller/workflow/executor_test.go index bded6f4126..0bbda87c31 100644 --- a/flytepropeller/pkg/controller/workflow/executor_test.go +++ b/flytepropeller/pkg/controller/workflow/executor_test.go @@ -520,7 +520,7 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_Failing(t *testing.T) { if assert.NoError(t, json.Unmarshal(wJSON, w)) { // For benchmark workflow, we will run into the first failure on round 6 - roundsToFail := 7 + roundsToFail := 8 for i := 0; i < roundsToFail; i++ { err := executor.HandleFlyteWorkflow(ctx, w) assert.Nil(t, err, "Round [%v]", i) @@ -534,6 +534,8 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_Failing(t *testing.T) { if i == roundsToFail-1 { assert.Equal(t, v1alpha1.WorkflowPhaseFailed, w.Status.Phase) + } else if i == roundsToFail-2 { + assert.Equal(t, v1alpha1.WorkflowPhaseHandlingFailureNode, w.Status.Phase) } else { assert.NotEqual(t, v1alpha1.WorkflowPhaseFailed, w.Status.Phase, "For Round [%v] got phase [%v]", i, w.Status.Phase.String()) } diff --git a/flytepropeller/pkg/controller/workflow/testdata/benchmark_wf.yaml b/flytepropeller/pkg/controller/workflow/testdata/benchmark_wf.yaml index d53059b330..1ea8b9abbc 100644 --- a/flytepropeller/pkg/controller/workflow/testdata/benchmark_wf.yaml +++ b/flytepropeller/pkg/controller/workflow/testdata/benchmark_wf.yaml @@ -205,6 +205,22 @@ spec: status: phase: 0 task: sum-non-none + onFailure: + id: en0 + inputBindings: + - binding: + promise: + nodeId: start-node + var: triggered_date + var: triggered_date + kind: task + name: delete-cluster + resources: + requests: + cpu: "2" + memory: 2Gi + status: + phase: 0 status: phase: 0 tasks: @@ -290,6 +306,40 @@ tasks: version: 1.19.0b10 timeout: 0s type: "7" + delete-cluster: + container: + args: + - --task-module=flytekit.examples.tasks + - --task-name=print_every_time + - --inputs={{$input}} + - --output-prefix={{$output}} + command: + - flyte-python-entrypoint + image: myflytecontainer:abc123 + resources: + requests: + - name: 1 + value: "2.000" + - name: 3 + value: 2048Mi + - name: 2 + value: "0.000" + id: + name: delete-cluster + interface: + inputs: + variables: + date_triggered: + type: + simple: DATETIME + outputs: + variables: { } + metadata: + runtime: + type: 1 + version: 1.19.0b10 + timeout: 0s + type: "7" sum-and-print: container: args: From 80c1c59bb99bed15856b7359901f106083a3ed15 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 16 Nov 2023 20:47:05 -0800 Subject: [PATCH 11/16] compile failure node Signed-off-by: Kevin Su --- flytepropeller/pkg/compiler/requirements.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/flytepropeller/pkg/compiler/requirements.go b/flytepropeller/pkg/compiler/requirements.go index 59243554d3..14b1778e38 100644 --- a/flytepropeller/pkg/compiler/requirements.go +++ b/flytepropeller/pkg/compiler/requirements.go @@ -60,6 +60,9 @@ func updateWorkflowRequirements(workflow *core.WorkflowTemplate, subWfs common.W for _, node := range workflow.Nodes { updateNodeRequirements(node, subWfs, taskIds, workflowIds, followSubworkflows, errs) } + if workflow.FailureNode != nil { + updateNodeRequirements(workflow.FailureNode, subWfs, taskIds, workflowIds, followSubworkflows, errs) + } } func updateNodeRequirements(node *flyteNode, subWfs common.WorkflowIndex, taskIds, workflowIds common.IdentifierSet, From 8e49b3fa264219bd2550096c06be2dc17d2d84ad Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 28 Nov 2023 14:44:07 -0800 Subject: [PATCH 12/16] update Signed-off-by: Kevin Su --- .../pkg/compiler/transformers/k8s/workflow.go | 10 ---------- flytepropeller/pkg/compiler/workflow_compiler.go | 5 +++++ .../pkg/controller/executors/failure_node_lookup.go | 6 +++--- .../pkg/controller/nodes/subworkflow/subworkflow.go | 11 ++++++----- 4 files changed, 14 insertions(+), 18 deletions(-) diff --git a/flytepropeller/pkg/compiler/transformers/k8s/workflow.go b/flytepropeller/pkg/compiler/transformers/k8s/workflow.go index a9ff8b83d7..2421ddf9bb 100644 --- a/flytepropeller/pkg/compiler/transformers/k8s/workflow.go +++ b/flytepropeller/pkg/compiler/transformers/k8s/workflow.go @@ -87,16 +87,6 @@ func buildFlyteWorkflowSpec(wf *core.CompiledWorkflow, tasks []*core.CompiledTas var failureN *v1alpha1.NodeSpec if n := wf.Template.GetFailureNode(); n != nil { - for _, in := range n.Inputs { - in.Binding = &core.BindingData{ - Value: &core.BindingData_Promise{ - Promise: &core.OutputReference{ - Var: in.Binding.GetPromise().GetVar(), - NodeId: v1alpha1.StartNodeID, - }, - }, - } - } nodes, ok := buildNodeSpec(n, tasks, errs.NewScope()) if !ok { return nil, errs diff --git a/flytepropeller/pkg/compiler/workflow_compiler.go b/flytepropeller/pkg/compiler/workflow_compiler.go index 07fc7bf938..98658f8ad6 100644 --- a/flytepropeller/pkg/compiler/workflow_compiler.go +++ b/flytepropeller/pkg/compiler/workflow_compiler.go @@ -216,6 +216,9 @@ func (w workflowBuilder) ValidateWorkflow(fg *flyteWorkflow, errs errors.Compile } } + failureNode := fg.Template.FailureNode + v.ValidateNode(&wf, wf.GetOrCreateNodeBuilder(failureNode), false /* validateConditionTypes */, errs.NewScope()) + // Add explicitly and implicitly declared edges for nodeID, n := range wf.Nodes { if nodeID == c.StartNodeID { @@ -225,6 +228,8 @@ func (w workflowBuilder) ValidateWorkflow(fg *flyteWorkflow, errs errors.Compile wf.AddEdges(n, c.EdgeDirectionBidirectional, errs.NewScope()) } + wf.AddEdges(wf.GetOrCreateNodeBuilder(failureNode), c.EdgeDirectionUpstream, errs.NewScope()) + // Add execution edges for orphan nodes that don't have any inward/outward edges. for nodeID := range wf.Nodes { if nodeID == c.StartNodeID || nodeID == c.EndNodeID { diff --git a/flytepropeller/pkg/controller/executors/failure_node_lookup.go b/flytepropeller/pkg/controller/executors/failure_node_lookup.go index 0c61540259..a67e4b57ad 100644 --- a/flytepropeller/pkg/controller/executors/failure_node_lookup.go +++ b/flytepropeller/pkg/controller/executors/failure_node_lookup.go @@ -13,14 +13,14 @@ type FailureNodeLookup struct { } func (f FailureNodeLookup) GetNode(nodeID v1alpha1.NodeID) (v1alpha1.ExecutableNode, bool) { - if nodeID == v1alpha1.StartNodeID { - return f.NodeLookup.GetNode(nodeID) + if node, ok := f.NodeLookup.GetNode(nodeID); ok { + return node, true } return f.FailureNode, true } func (f FailureNodeLookup) GetNodeExecutionStatus(ctx context.Context, id v1alpha1.NodeID) v1alpha1.ExecutableNodeStatus { - if id == v1alpha1.StartNodeID { + if _, ok := f.NodeLookup.GetNode(id); ok { return f.NodeLookup.GetNodeExecutionStatus(ctx, id) } return f.FailureNodeStatus diff --git a/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go b/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go index 6ea22b9637..687cad9eee 100644 --- a/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go +++ b/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go @@ -64,11 +64,10 @@ func (s *subworkflowHandler) startAndHandleSubWorkflow(ctx context.Context, nCtx func (s *subworkflowHandler) handleSubWorkflow(ctx context.Context, nCtx interfaces.NodeExecutionContext, subworkflow v1alpha1.ExecutableSubWorkflow, nl executors.NodeLookup) (handler.Transition, error) { // The current node would end up becoming the parent for the sub workflow nodes. // This is done to track the lineage. For level zero, the CreateParentInfo will return nil - newParentInfo, err := common.CreateParentInfo(nCtx.ExecutionContext().GetParentInfo(), nCtx.NodeID(), nCtx.CurrentAttempt()) + execContext, err := s.getExecutionContextForDownstream(nCtx) if err != nil { return handler.UnknownTransition, err } - execContext := executors.NewExecutionContextWithParentInfo(nCtx.ExecutionContext(), newParentInfo) state, err := s.nodeExecutor.RecursiveNodeHandler(ctx, execContext, subworkflow, nl, subworkflow.StartNode()) if err != nil { return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoUndefined), err @@ -149,10 +148,12 @@ func (s *subworkflowHandler) HandleFailureNodeOfSubWorkflow(ctx context.Context, if err != nil { return handler.UnknownTransition, err } - subNodeLookup := nCtx.ContextualNodeLookup() + status := nCtx.NodeStatus() + subworkflowNodeLookup := executors.NewNodeLookup(subworkflow, status, subworkflow) + // subNodeLookup := nCtx.ContextualNodeLookup() // TODO: NodeStatus() is deprecated, how do we get the status of the failure node? - failureNodeStatus := nCtx.NodeStatus().GetNodeExecutionStatus(ctx, failureNode.GetID()) - failureNodeLookup := executors.NewFailureNodeLookup(subNodeLookup, failureNode, failureNodeStatus) + failureNodeStatus := status.GetNodeExecutionStatus(ctx, failureNode.GetID()) + failureNodeLookup := executors.NewFailureNodeLookup(subworkflowNodeLookup, failureNode, failureNodeStatus) state, err := s.nodeExecutor.RecursiveNodeHandler(ctx, execContext, failureNodeLookup, failureNodeLookup, failureNode) if err != nil { From a805168e65a66e998ff326f1ad9901d623022770 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 29 Nov 2023 15:36:43 -0800 Subject: [PATCH 13/16] Create a failure node in subworkflow Signed-off-by: Kevin Su --- flyte-single-binary-local.yaml | 7 ++++++- flytepropeller/pkg/compiler/workflow_compiler.go | 9 +++++---- .../pkg/controller/nodes/subworkflow/handler.go | 10 +++++----- .../pkg/controller/nodes/subworkflow/subworkflow.go | 4 ++-- flytepropeller/pkg/controller/nodes/transformers.go | 2 ++ flytepropeller/pkg/controller/workflow/executor.go | 2 +- 6 files changed, 21 insertions(+), 13 deletions(-) diff --git a/flyte-single-binary-local.yaml b/flyte-single-binary-local.yaml index 87bacdd5fc..0b04762216 100644 --- a/flyte-single-binary-local.yaml +++ b/flyte-single-binary-local.yaml @@ -15,7 +15,7 @@ cluster_resources: logger: show-source: true - level: 6 + level: 4 propeller: create-flyteworkflow-crd: true @@ -83,3 +83,8 @@ storage: access_key_id: minio secret_key: miniostorage container: my-s3-bucket + + +flyte: + propeller: + disableWebhook: true diff --git a/flytepropeller/pkg/compiler/workflow_compiler.go b/flytepropeller/pkg/compiler/workflow_compiler.go index 98658f8ad6..32b3d4d13d 100644 --- a/flytepropeller/pkg/compiler/workflow_compiler.go +++ b/flytepropeller/pkg/compiler/workflow_compiler.go @@ -216,9 +216,6 @@ func (w workflowBuilder) ValidateWorkflow(fg *flyteWorkflow, errs errors.Compile } } - failureNode := fg.Template.FailureNode - v.ValidateNode(&wf, wf.GetOrCreateNodeBuilder(failureNode), false /* validateConditionTypes */, errs.NewScope()) - // Add explicitly and implicitly declared edges for nodeID, n := range wf.Nodes { if nodeID == c.StartNodeID { @@ -228,7 +225,11 @@ func (w workflowBuilder) ValidateWorkflow(fg *flyteWorkflow, errs errors.Compile wf.AddEdges(n, c.EdgeDirectionBidirectional, errs.NewScope()) } - wf.AddEdges(wf.GetOrCreateNodeBuilder(failureNode), c.EdgeDirectionUpstream, errs.NewScope()) + if fg.Template.FailureNode != nil { + failureNode := fg.Template.FailureNode + v.ValidateNode(&wf, wf.GetOrCreateNodeBuilder(failureNode), false /* validateConditionTypes */, errs.NewScope()) + wf.AddEdges(wf.GetOrCreateNodeBuilder(failureNode), c.EdgeDirectionUpstream, errs.NewScope()) + } // Add execution edges for orphan nodes that don't have any inward/outward edges. for nodeID := range wf.Nodes { diff --git a/flytepropeller/pkg/controller/nodes/subworkflow/handler.go b/flytepropeller/pkg/controller/nodes/subworkflow/handler.go index 5f9f16d206..e7cffeeea1 100644 --- a/flytepropeller/pkg/controller/nodes/subworkflow/handler.go +++ b/flytepropeller/pkg/controller/nodes/subworkflow/handler.go @@ -49,12 +49,11 @@ func (w *workflowNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeEx errors.BadSpecificationError, errMsg, nil)), nil } - updateNodeStateFn := func(transition handler.Transition, newPhase v1alpha1.WorkflowNodePhase, err error) (handler.Transition, error) { + updateNodeStateFn := func(transition handler.Transition, workflowNodeState handler.WorkflowNodeState, err error) (handler.Transition, error) { if err != nil { return transition, err } - workflowNodeState := handler.WorkflowNodeState{Phase: newPhase} err = nCtx.NodeStateWriter().PutWorkflowNodeState(workflowNodeState) if err != nil { logger.Errorf(ctx, "Failed to store WorkflowNodeState, err :%s", err.Error()) @@ -75,10 +74,10 @@ func (w *workflowNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeEx if wfNode.GetSubWorkflowRef() != nil { trns, err := w.subWfHandler.StartSubWorkflow(ctx, nCtx) - return updateNodeStateFn(trns, v1alpha1.WorkflowNodePhaseExecuting, err) + return updateNodeStateFn(trns, handler.WorkflowNodeState{Phase: v1alpha1.WorkflowNodePhaseExecuting}, err) } else if wfNode.GetLaunchPlanRefID() != nil { trns, err := w.lpHandler.StartLaunchPlan(ctx, nCtx) - return updateNodeStateFn(trns, v1alpha1.WorkflowNodePhaseExecuting, err) + return updateNodeStateFn(trns, handler.WorkflowNodeState{Phase: v1alpha1.WorkflowNodePhaseExecuting}, err) } return invalidWFNodeError() @@ -95,8 +94,9 @@ func (w *workflowNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeEx } if wfNode.GetSubWorkflowRef() != nil { + originalError := nCtx.NodeStateReader().GetWorkflowNodeState().Error trns, err := w.subWfHandler.HandleFailingSubWorkflow(ctx, nCtx) - return updateNodeStateFn(trns, workflowPhase, err) + return updateNodeStateFn(trns, handler.WorkflowNodeState{Phase: workflowPhase, Error: originalError}, err) } else if wfNode.GetLaunchPlanRefID() != nil { // There is no failure node for launch plans, terminate immediately. return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailureErr(wfNodeState.Error, nil)), nil diff --git a/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go b/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go index 687cad9eee..e7c8c00cd1 100644 --- a/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go +++ b/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go @@ -160,7 +160,7 @@ func (s *subworkflowHandler) HandleFailureNodeOfSubWorkflow(ctx context.Context, return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoUndefined), err } - if state.NodePhase == interfaces.NodePhaseRunning { + if state.NodePhase == interfaces.NodePhaseQueued || state.NodePhase == interfaces.NodePhaseRunning { return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(nil)), nil } @@ -175,7 +175,7 @@ func (s *subworkflowHandler) HandleFailureNodeOfSubWorkflow(ctx context.Context, return handler.UnknownTransition, err } - return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailingErr(originalError, nil)), nil + return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(nil)), nil } // When handling the failure node succeeds, the final status will still be failure diff --git a/flytepropeller/pkg/controller/nodes/transformers.go b/flytepropeller/pkg/controller/nodes/transformers.go index 8c0db1e57a..82354656c8 100644 --- a/flytepropeller/pkg/controller/nodes/transformers.go +++ b/flytepropeller/pkg/controller/nodes/transformers.go @@ -216,6 +216,8 @@ func ToNodePhase(p handler.EPhase) (v1alpha1.NodePhase, error) { return v1alpha1.NodePhaseSucceeding, nil case handler.EPhaseFailed: return v1alpha1.NodePhaseFailing, nil + case handler.EPhaseFailing: + return v1alpha1.NodePhaseFailing, nil case handler.EPhaseTimedout: return v1alpha1.NodePhaseTimingOut, nil case handler.EPhaseRecovered: diff --git a/flytepropeller/pkg/controller/workflow/executor.go b/flytepropeller/pkg/controller/workflow/executor.go index d3cbfb71b3..644760c41b 100644 --- a/flytepropeller/pkg/controller/workflow/executor.go +++ b/flytepropeller/pkg/controller/workflow/executor.go @@ -145,6 +145,7 @@ func (c *workflowExecutor) handleRunningWorkflow(ctx context.Context, w *v1alpha } execcontext := executors.NewExecutionContext(w, w, w, nil, executors.InitializeControlFlow()) state, err := c.nodeExecutor.RecursiveNodeHandler(ctx, execcontext, w, w, startNode) + if err != nil { return StatusRunning, err } @@ -293,7 +294,6 @@ func (c *workflowExecutor) TransitionToPhase(ctx context.Context, execID *core.W wStatus.UpdatePhase(v1alpha1.WorkflowPhaseFailing, "", wfEvent.GetError()) wfEvent.OccurredAt = utils.GetProtoTime(nil) case v1alpha1.WorkflowPhaseHandlingFailureNode: - // TODO: Add core.WorkflowPhaseHandlingFailureNode to idl? wfEvent.Phase = core.WorkflowExecution_FAILING wfEvent.OutputResult = convertToExecutionError(toStatus.Err, previousError) wStatus.UpdatePhase(v1alpha1.WorkflowPhaseHandlingFailureNode, "", wfEvent.GetError()) From 49e31b6de430b02a4c7a76338de1652ed76bf42c Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 30 Nov 2023 00:35:55 -0800 Subject: [PATCH 14/16] More tests Signed-off-by: Kevin Su --- flyte-single-binary-local.yaml | 7 +- .../pkg/compiler/workflow_compiler_test.go | 93 +++++++++++++++++++ .../executors/failure_node_lookup.go | 6 +- .../executors/failure_node_lookup_test.go | 13 ++- .../nodes/subworkflow/subworkflow.go | 2 - .../pkg/controller/nodes/transformers.go | 2 - .../pkg/controller/workflow/executor.go | 1 - .../pkg/controller/workflow/executor_test.go | 4 +- .../workflow/testdata/benchmark_wf.yaml | 2 +- 9 files changed, 111 insertions(+), 19 deletions(-) diff --git a/flyte-single-binary-local.yaml b/flyte-single-binary-local.yaml index 0b04762216..87bacdd5fc 100644 --- a/flyte-single-binary-local.yaml +++ b/flyte-single-binary-local.yaml @@ -15,7 +15,7 @@ cluster_resources: logger: show-source: true - level: 4 + level: 6 propeller: create-flyteworkflow-crd: true @@ -83,8 +83,3 @@ storage: access_key_id: minio secret_key: miniostorage container: my-s3-bucket - - -flyte: - propeller: - disableWebhook: true diff --git a/flytepropeller/pkg/compiler/workflow_compiler_test.go b/flytepropeller/pkg/compiler/workflow_compiler_test.go index e9e01d36c2..abbfabae9e 100644 --- a/flytepropeller/pkg/compiler/workflow_compiler_test.go +++ b/flytepropeller/pkg/compiler/workflow_compiler_test.go @@ -114,6 +114,99 @@ func ExampleCompileWorkflow_basic() { // Compile Errors: } +func TestCompileWorkflowWithFailureNode(t *testing.T) { + inputWorkflow := &core.WorkflowTemplate{ + Id: &core.Identifier{Name: "repo"}, + Interface: &core.TypedInterface{ + Inputs: createEmptyVariableMap(), + Outputs: createEmptyVariableMap(), + }, + Nodes: []*core.Node{ + { + Id: "FirstNode", + Target: &core.Node_TaskNode{ + TaskNode: &core.TaskNode{ + Reference: &core.TaskNode_ReferenceId{ + ReferenceId: &core.Identifier{Name: "task_123"}, + }, + }, + }, + }, + }, + FailureNode: &core.Node{ + Id: "FailureNode", + Target: &core.Node_TaskNode{ + TaskNode: &core.TaskNode{ + Reference: &core.TaskNode_ReferenceId{ + ReferenceId: &core.Identifier{Name: "cleanup"}, + }, + }, + }, + }, + } + + // Detect what other workflows/tasks does this coreWorkflow reference + subWorkflows := make([]*core.WorkflowTemplate, 0) + reqs, err := GetRequirements(inputWorkflow, subWorkflows) + if err != nil { + fmt.Printf("failed to get requirements. Error: %v", err) + return + } + + fmt.Printf("Needed Tasks: [%v], Needed Workflows [%v]\n", + strings.Join(dumpIdentifierNames(reqs.GetRequiredTaskIds()), ","), + strings.Join(dumpIdentifierNames(reqs.GetRequiredLaunchPlanIds()), ",")) + + // Replace with logic to satisfy the requirements + workflows := make([]common.InterfaceProvider, 0) + tasks := []*core.TaskTemplate{ + { + Id: &core.Identifier{Name: "task_123"}, + Interface: &core.TypedInterface{ + Inputs: createEmptyVariableMap(), + Outputs: createEmptyVariableMap(), + }, + Target: &core.TaskTemplate_Container{ + Container: &core.Container{ + Image: "image://", + Command: []string{"cmd"}, + Args: []string{"args"}, + }, + }, + }, + { + Id: &core.Identifier{Name: "cleanup"}, + Interface: &core.TypedInterface{ + Inputs: createEmptyVariableMap(), + Outputs: createEmptyVariableMap(), + }, + Target: &core.TaskTemplate_Container{ + Container: &core.Container{ + Image: "image://", + Command: []string{"cmd"}, + Args: []string{"args"}, + }, + }, + }, + } + + compiledTasks := make([]*core.CompiledTask, 0, len(tasks)) + for _, task := range tasks { + compiledTask, err := CompileTask(task) + if err != nil { + fmt.Printf("failed to compile task [%v]. Error: %v", task.Id, err) + return + } + + compiledTasks = append(compiledTasks, compiledTask) + } + + output, errs := CompileWorkflow(inputWorkflow, subWorkflows, compiledTasks, workflows) + assert.Equal(t, output.Primary.Template.FailureNode.Id, "FailureNode") + assert.NotNil(t, output.Primary.Template.FailureNode.GetTaskNode()) + assert.Nil(t, errs) +} + func ExampleCompileWorkflow_inputsOutputsBinding() { inputWorkflow := &core.WorkflowTemplate{ Id: &core.Identifier{Name: "repo"}, diff --git a/flytepropeller/pkg/controller/executors/failure_node_lookup.go b/flytepropeller/pkg/controller/executors/failure_node_lookup.go index a67e4b57ad..0c61540259 100644 --- a/flytepropeller/pkg/controller/executors/failure_node_lookup.go +++ b/flytepropeller/pkg/controller/executors/failure_node_lookup.go @@ -13,14 +13,14 @@ type FailureNodeLookup struct { } func (f FailureNodeLookup) GetNode(nodeID v1alpha1.NodeID) (v1alpha1.ExecutableNode, bool) { - if node, ok := f.NodeLookup.GetNode(nodeID); ok { - return node, true + if nodeID == v1alpha1.StartNodeID { + return f.NodeLookup.GetNode(nodeID) } return f.FailureNode, true } func (f FailureNodeLookup) GetNodeExecutionStatus(ctx context.Context, id v1alpha1.NodeID) v1alpha1.ExecutableNodeStatus { - if _, ok := f.NodeLookup.GetNode(id); ok { + if id == v1alpha1.StartNodeID { return f.NodeLookup.GetNodeExecutionStatus(ctx, id) } return f.FailureNodeStatus diff --git a/flytepropeller/pkg/controller/executors/failure_node_lookup_test.go b/flytepropeller/pkg/controller/executors/failure_node_lookup_test.go index 27193603d3..b2dfa32231 100644 --- a/flytepropeller/pkg/controller/executors/failure_node_lookup_test.go +++ b/flytepropeller/pkg/controller/executors/failure_node_lookup_test.go @@ -48,10 +48,19 @@ func TestNewTestFailureNodeLookup(t *testing.T) { r, ok := failureNodeLookup.GetNode(v1alpha1.StartNodeID) assert.True(t, ok) assert.Equal(t, n, r) - assert.Equal(t, ns, nl.GetNodeExecutionStatus(context.TODO(), v1alpha1.StartNodeID)) + assert.Equal(t, ns, failureNodeLookup.GetNodeExecutionStatus(context.TODO(), v1alpha1.StartNodeID)) r, ok = failureNodeLookup.GetNode(failureNodeID) assert.True(t, ok) assert.Equal(t, n, r) - assert.Equal(t, ns, nl.GetNodeExecutionStatus(context.TODO(), failureNodeID)) + assert.Equal(t, ns, failureNodeLookup.GetNodeExecutionStatus(context.TODO(), failureNodeID)) + + nodeIDs, err := failureNodeLookup.ToNode(failureNodeID) + assert.Equal(t, len(nodeIDs), 1) + assert.Equal(t, nodeIDs[0], v1alpha1.StartNodeID) + assert.Nil(t, err) + + nodeIDs, err = failureNodeLookup.FromNode(failureNodeID) + assert.Nil(t, nodeIDs) + assert.Nil(t, err) } diff --git a/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go b/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go index e7c8c00cd1..5b6c1f02f7 100644 --- a/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go +++ b/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go @@ -150,8 +150,6 @@ func (s *subworkflowHandler) HandleFailureNodeOfSubWorkflow(ctx context.Context, } status := nCtx.NodeStatus() subworkflowNodeLookup := executors.NewNodeLookup(subworkflow, status, subworkflow) - // subNodeLookup := nCtx.ContextualNodeLookup() - // TODO: NodeStatus() is deprecated, how do we get the status of the failure node? failureNodeStatus := status.GetNodeExecutionStatus(ctx, failureNode.GetID()) failureNodeLookup := executors.NewFailureNodeLookup(subworkflowNodeLookup, failureNode, failureNodeStatus) diff --git a/flytepropeller/pkg/controller/nodes/transformers.go b/flytepropeller/pkg/controller/nodes/transformers.go index 82354656c8..8c0db1e57a 100644 --- a/flytepropeller/pkg/controller/nodes/transformers.go +++ b/flytepropeller/pkg/controller/nodes/transformers.go @@ -216,8 +216,6 @@ func ToNodePhase(p handler.EPhase) (v1alpha1.NodePhase, error) { return v1alpha1.NodePhaseSucceeding, nil case handler.EPhaseFailed: return v1alpha1.NodePhaseFailing, nil - case handler.EPhaseFailing: - return v1alpha1.NodePhaseFailing, nil case handler.EPhaseTimedout: return v1alpha1.NodePhaseTimingOut, nil case handler.EPhaseRecovered: diff --git a/flytepropeller/pkg/controller/workflow/executor.go b/flytepropeller/pkg/controller/workflow/executor.go index 644760c41b..355681a746 100644 --- a/flytepropeller/pkg/controller/workflow/executor.go +++ b/flytepropeller/pkg/controller/workflow/executor.go @@ -171,7 +171,6 @@ func (c *workflowExecutor) handleRunningWorkflow(ctx context.Context, w *v1alpha func (c *workflowExecutor) handleFailureNode(ctx context.Context, w *v1alpha1.FlyteWorkflow) (Status, error) { execErr := executionErrorOrDefault(w.GetExecutionStatus().GetExecutionError(), w.GetExecutionStatus().GetMessage()) failureNode := w.GetOnFailureNode() - logger.Infof(ctx, "Handling FailureNode [%v]", failureNode.GetID()) execcontext := executors.NewExecutionContext(w, w, w, nil, executors.InitializeControlFlow()) failureNodeStatus := w.GetExecutionStatus().GetNodeExecutionStatus(ctx, failureNode.GetID()) diff --git a/flytepropeller/pkg/controller/workflow/executor_test.go b/flytepropeller/pkg/controller/workflow/executor_test.go index 0bbda87c31..cc9910abc3 100644 --- a/flytepropeller/pkg/controller/workflow/executor_test.go +++ b/flytepropeller/pkg/controller/workflow/executor_test.go @@ -463,7 +463,7 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_Failing(t *testing.T) { recordedRunning := false recordedFailed := false - recordedFailing := true + recordedFailing := false eventSink := eventMocks.NewMockEventSink() eventSink.SinkCb = func(ctx context.Context, message proto.Message) error { e, ok := message.(*event.WorkflowExecutionEvent) @@ -565,7 +565,7 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_Events(t *testing.T) { recordedRunning := false recordedSuccess := false - recordedFailing := true + recordedFailing := false eventSink := eventMocks.NewMockEventSink() eventSink.SinkCb = func(ctx context.Context, message proto.Message) error { e, ok := message.(*event.WorkflowExecutionEvent) diff --git a/flytepropeller/pkg/controller/workflow/testdata/benchmark_wf.yaml b/flytepropeller/pkg/controller/workflow/testdata/benchmark_wf.yaml index 1ea8b9abbc..d7a4e07f75 100644 --- a/flytepropeller/pkg/controller/workflow/testdata/benchmark_wf.yaml +++ b/flytepropeller/pkg/controller/workflow/testdata/benchmark_wf.yaml @@ -206,7 +206,7 @@ spec: phase: 0 task: sum-non-none onFailure: - id: en0 + id: fn0 inputBindings: - binding: promise: From d7dd52abe2a25f5de72f04d0fe4e74584a69667e Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 30 Nov 2023 12:04:53 -0800 Subject: [PATCH 15/16] nit Signed-off-by: Kevin Su --- .../pkg/controller/workflow/testdata/benchmark_wf.yaml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/flytepropeller/pkg/controller/workflow/testdata/benchmark_wf.yaml b/flytepropeller/pkg/controller/workflow/testdata/benchmark_wf.yaml index d7a4e07f75..5ce7b0d969 100644 --- a/flytepropeller/pkg/controller/workflow/testdata/benchmark_wf.yaml +++ b/flytepropeller/pkg/controller/workflow/testdata/benchmark_wf.yaml @@ -211,8 +211,8 @@ spec: - binding: promise: nodeId: start-node - var: triggered_date - var: triggered_date + var: name + var: name kind: task name: delete-cluster resources: @@ -329,9 +329,9 @@ tasks: interface: inputs: variables: - date_triggered: + name: type: - simple: DATETIME + simple: STRING outputs: variables: { } metadata: From b2ab9aa5b0cf23332baa5690bf9d51477491f723 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 4 Dec 2023 13:52:33 -0800 Subject: [PATCH 16/16] Address comment Signed-off-by: Kevin Su --- flytepropeller/pkg/compiler/workflow_compiler.go | 2 +- .../pkg/compiler/workflow_compiler_test.go | 15 +++------------ .../controller/nodes/subworkflow/subworkflow.go | 3 +-- 3 files changed, 5 insertions(+), 15 deletions(-) diff --git a/flytepropeller/pkg/compiler/workflow_compiler.go b/flytepropeller/pkg/compiler/workflow_compiler.go index 32b3d4d13d..89e82ebd16 100644 --- a/flytepropeller/pkg/compiler/workflow_compiler.go +++ b/flytepropeller/pkg/compiler/workflow_compiler.go @@ -227,7 +227,7 @@ func (w workflowBuilder) ValidateWorkflow(fg *flyteWorkflow, errs errors.Compile if fg.Template.FailureNode != nil { failureNode := fg.Template.FailureNode - v.ValidateNode(&wf, wf.GetOrCreateNodeBuilder(failureNode), false /* validateConditionTypes */, errs.NewScope()) + v.ValidateNode(&wf, wf.GetOrCreateNodeBuilder(failureNode), false, errs.NewScope()) wf.AddEdges(wf.GetOrCreateNodeBuilder(failureNode), c.EdgeDirectionUpstream, errs.NewScope()) } diff --git a/flytepropeller/pkg/compiler/workflow_compiler_test.go b/flytepropeller/pkg/compiler/workflow_compiler_test.go index abbfabae9e..890655f386 100644 --- a/flytepropeller/pkg/compiler/workflow_compiler_test.go +++ b/flytepropeller/pkg/compiler/workflow_compiler_test.go @@ -148,14 +148,8 @@ func TestCompileWorkflowWithFailureNode(t *testing.T) { // Detect what other workflows/tasks does this coreWorkflow reference subWorkflows := make([]*core.WorkflowTemplate, 0) reqs, err := GetRequirements(inputWorkflow, subWorkflows) - if err != nil { - fmt.Printf("failed to get requirements. Error: %v", err) - return - } - - fmt.Printf("Needed Tasks: [%v], Needed Workflows [%v]\n", - strings.Join(dumpIdentifierNames(reqs.GetRequiredTaskIds()), ","), - strings.Join(dumpIdentifierNames(reqs.GetRequiredLaunchPlanIds()), ",")) + assert.Nil(t, err) + assert.Equal(t, reqs.taskIds, []common.Identifier{{Name: "cleanup"}, {Name: "task_123"}}) // Replace with logic to satisfy the requirements workflows := make([]common.InterfaceProvider, 0) @@ -193,10 +187,7 @@ func TestCompileWorkflowWithFailureNode(t *testing.T) { compiledTasks := make([]*core.CompiledTask, 0, len(tasks)) for _, task := range tasks { compiledTask, err := CompileTask(task) - if err != nil { - fmt.Printf("failed to compile task [%v]. Error: %v", task.Id, err) - return - } + assert.Nil(t, err) compiledTasks = append(compiledTasks, compiledTask) } diff --git a/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go b/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go index 5b6c1f02f7..ee90fe581e 100644 --- a/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go +++ b/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go @@ -142,8 +142,7 @@ func (s *subworkflowHandler) getExecutionContextForDownstream(nCtx interfaces.No func (s *subworkflowHandler) HandleFailureNodeOfSubWorkflow(ctx context.Context, nCtx interfaces.NodeExecutionContext, subworkflow v1alpha1.ExecutableSubWorkflow, nl executors.NodeLookup) (handler.Transition, error) { originalError := nCtx.NodeStateReader().GetWorkflowNodeState().Error - failureNode := subworkflow.GetOnFailureNode() - if failureNode != nil { + if failureNode := subworkflow.GetOnFailureNode(); failureNode != nil { execContext, err := s.getExecutionContextForDownstream(nCtx) if err != nil { return handler.UnknownTransition, err