-
Notifications
You must be signed in to change notification settings - Fork 305
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Core feature] Flytekit should support unsafe
mode for types
#2419
Changes from 3 commits
3570363
8c7e5fa
c7c9a2a
1524c81
0519fe6
9885189
f107b35
8db4bce
aa1760c
df0f898
8368723
1b5bfeb
51ea34c
d35c231
933458a
11b5d9c
75296df
7db0869
43f8948
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,7 +30,7 @@ | |
from flytekit.core.resources import Resources | ||
from flytekit.core.task import TaskMetadata, task | ||
from flytekit.core.testing import patch, task_mock | ||
from flytekit.core.type_engine import RestrictedTypeError, SimpleTransformer, TypeEngine | ||
from flytekit.core.type_engine import RestrictedTypeError, SimpleTransformer, TypeEngine, TypeTransformerFailedError | ||
from flytekit.core.workflow import workflow | ||
from flytekit.exceptions.user import FlyteValidationException | ||
from flytekit.models import literals as _literal_models | ||
|
@@ -80,7 +80,9 @@ def test_forwardref_namedtuple_output(): | |
# This test case tests typing.NamedTuple outputs for cases where eg. | ||
# from __future__ import annotations is enabled, such that all type hints become ForwardRef | ||
@task | ||
def my_task(a: int) -> typing.NamedTuple("OutputsBC", b=typing.ForwardRef("int"), c=typing.ForwardRef("str")): | ||
def my_task( | ||
a: int, | ||
) -> typing.NamedTuple("OutputsBC", b=typing.ForwardRef("int"), c=typing.ForwardRef("str")): | ||
ctx = flytekit.current_context() | ||
assert str(ctx.execution_id) == "ex:local:local:local" | ||
return a + 2, "hello world" | ||
|
@@ -1915,7 +1917,12 @@ def wf() -> pd.DataFrame: | |
|
||
df = wf() | ||
|
||
expected_df = pd.DataFrame(data={"col1": [1 + 10 + 100, 2 + 20 + 200], "col2": [3 + 30 + 300, 4 + 40 + 400]}) | ||
expected_df = pd.DataFrame( | ||
data={ | ||
"col1": [1 + 10 + 100, 2 + 20 + 200], | ||
"col2": [3 + 30 + 300, 4 + 40 + 400], | ||
} | ||
) | ||
assert expected_df.equals(df) | ||
|
||
|
||
|
@@ -2000,3 +2007,59 @@ def my_wf(a: int, retries: int) -> int: | |
|
||
with pytest.raises(AssertionError): | ||
my_wf(a=1, retries=1) | ||
|
||
|
||
def test_unsafe_input_wf_and_task(): | ||
@task(unsafe=True) | ||
def t1(a) -> int: | ||
if type(a) == int: | ||
return a + 1 | ||
return 0 | ||
|
||
@task | ||
def t2_wo_unsafe(a) -> int: | ||
return a + 1 | ||
|
||
@workflow(unsafe=True) | ||
def wf1_with_unsafe(a) -> int: | ||
return t1(a=a) | ||
|
||
assert wf1_with_unsafe(a=1) == 2 | ||
assert wf1_with_unsafe(a="1") == 0 | ||
assert wf1_with_unsafe(a=None) == 0 | ||
|
||
@workflow | ||
def wf1_wo_unsafe(a) -> int: | ||
return t1(a=a) | ||
|
||
@workflow | ||
def wf1_wo_unsafe2(a: int) -> int: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did you test this workflow in the sandbox cluster? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, I only test it on my local environment. Can I ask how to try it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This PR can help you, you can either use imageSpec or Dockerfile to create your own image. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The above code will fail. The reason is detailed in flyteorg/flyte#5261. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Currently, the input with a value |
||
return t2_wo_unsafe(a=a) | ||
|
||
with pytest.raises(TypeError): | ||
wf1_wo_unsafe(a=1) | ||
|
||
with pytest.raises(TypeTransformerFailedError): | ||
wf1_wo_unsafe2(a=1) | ||
|
||
|
||
def test_unsafe_wf_and_task(): | ||
@task(unsafe=True) | ||
def t1(a): | ||
if type(a) != int: | ||
return None | ||
return a + 1 | ||
|
||
@task(unsafe=True) | ||
def t2(a): | ||
if type(a) != int: | ||
return None | ||
return a + 2 | ||
|
||
@workflow(unsafe=True) | ||
def wf1_with_unsafe(a): | ||
a1 = t1(a=a) | ||
return t2(a=a1) | ||
|
||
assert wf1_with_unsafe(a=1) == 4 | ||
assert wf1_with_unsafe(a="1") is None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer to have a more explicit name. Something like: