Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Local execution with compiled graph(#4080) #1917

Open
wants to merge 23 commits into
base: master
Choose a base branch
from

Conversation

ericwudayi
Copy link
Contributor

@ericwudayi ericwudayi commented Oct 24, 2023

TL;DR

Support user-defined execution sequence in local pyfylte. Now subworkflow/tasks can be sequence by >> operation within workflow. for example:

@workflow()
def wf():
  a = print_a() # task: print ("a")
  b = print_b() # task: print ("b")
  b >> a
wf() # should output "b\na"

Type

  • Bug Fix
  • Feature
  • Plugin

Are all requirements met?

  • Code completed
  • Smoke tested
  • Unit tests added
  • Code documentation added
  • Any pending items have an associated Issue

Complete description

  1. Support BranchNode local execution by adding ConditionalSection in __init__
  2. Add __call__ in BrachNode to eval all its expression via kwargs.
  3. Eval all the _lhs and _rhs of ComparisonExpression and ConjunctionExpression via DFS.
  4. Support local execution on Gate --> Need check, I just use the first key in kwargs as output.
  5. Implemented the Core algorithm.
    a. Do topological sort on workflow._nodes, check if cycle exists.
    b. Follow the implementation in ImperativeWorkflow
    c. Support dynamic by compiled one more time.
    d. Recursive execution with BranchNode
    f. Modify dynamic local execution flow

Tracking Issue

flyteorg/flyte#4080

Follow-up issue

NA
OR
https://github.com/flyteorg/flyte/issues/

@codecov
Copy link

codecov bot commented Oct 24, 2023

Codecov Report

Attention: 119 lines in your changes are missing coverage. Please review.

Comparison is base (744c167) 85.80% compared to head (602a55b) 54.25%.
Report is 44 commits behind head on master.

Files Patch % Lines
flytekit/core/workflow.py 14.44% 73 Missing and 4 partials ⚠️
flytekit/core/condition.py 21.21% 26 Missing ⚠️
flytekit/core/python_function_task.py 0.00% 13 Missing ⚠️
flytekit/core/gate.py 25.00% 3 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff             @@
##           master    #1917       +/-   ##
===========================================
- Coverage   85.80%   54.25%   -31.55%     
===========================================
  Files         313      173      -140     
  Lines       23278    16937     -6341     
  Branches     3526     3502       -24     
===========================================
- Hits        19973     9190    -10783     
- Misses       2702     7332     +4630     
+ Partials      603      415      -188     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Signed-off-by: Da-Yi Wu <[email protected]>

Resolve dynamic cases by compile one more time

Signed-off-by: Da-Yi Wu <[email protected]>

Modify Gate local execution

Signed-off-by: Da-Yi Wu <[email protected]>
@ericwudayi ericwudayi changed the title BranchNode local execution(#4080) Local execution with compiled graph(#4080) Oct 27, 2023
@ericwudayi
Copy link
Contributor Author

ericwudayi commented Oct 27, 2023

Actually, I run all the workflow testing file by replacing the execution function with newer one, execution_with_graph. I pass almost all the tests except for following nine. I take a deep dig in following cases, and they can be summarized into three cases. (@pingsutw )

  1. Failing condition on BranchNode. --> also failed on remote. (It would not show the failing message now)
  2. Type checking. --> cannot check using node-based execution.
@workflow
def wf():
  return 1
## should raise error 
  1. Regex error. --> missing the line cannot be recovered by node-based execution.
Screenshot 2023-10-25 at 9 37 16 AM

Signed-off-by: Da-Yi Wu <[email protected]>
@ericwudayi ericwudayi marked this pull request as ready for review October 27, 2023 03:36
@samhita-alla
Copy link
Contributor

@eapolinario @pingsutw @wild-endeavor can one of you please review?

ericwudayi and others added 6 commits November 4, 2023 16:34
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
flytekit/core/condition.py Outdated Show resolved Hide resolved
@@ -195,6 +195,8 @@ def __init__(
self._nodes: List[Node] = []
self._output_bindings: List[_literal_models.Binding] = []
self._docs = docs
# Create a map that holds the outputs of each node.
self._intermediate_node_outputs: Dict[Node, Dict[str, Promise]] = {GLOBAL_START_NODE: {}}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found this line may also have the node execution independency issue. If we execute the same node twice, this intermediate_node_output is not None before the second execution. Not sure if this would have any problem, maybe we can clear the state on the begining of execution_node?

pingsutw and others added 9 commits November 21, 2023 02:34
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Da-Yi Wu <[email protected]>
Signed-off-by: Da-Yi Wu <[email protected]>
Signed-off-by: Da-Yi Wu <[email protected]>
Signed-off-by: Da-Yi Wu <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants