From ae7d973652555f25f30e0b0bc161d4dcedcc07ca Mon Sep 17 00:00:00 2001 From: Chao-Heng Lee Date: Tue, 10 Oct 2023 20:02:41 +0800 Subject: [PATCH 1/3] Add min_success_ratio logic in map_task._raw_execute. Add test. Signed-off-by: Chao-Heng Lee --- flytekit/core/map_task.py | 17 +++++++++++--- tests/flytekit/unit/core/test_map_task.py | 27 +++++++++++++++++++++++ 2 files changed, 41 insertions(+), 3 deletions(-) diff --git a/flytekit/core/map_task.py b/flytekit/core/map_task.py index e47b731ac6..febcd82b7b 100644 --- a/flytekit/core/map_task.py +++ b/flytekit/core/map_task.py @@ -269,6 +269,11 @@ def _raw_execute(self, **kwargs) -> Any: else None ) + failed_count = 0 + min_successes = len(kwargs[any_input_key]) + if self._min_success_ratio: + min_successes = int(min_successes * self._min_success_ratio) + for i in range(len(kwargs[any_input_key])): single_instance_inputs = {} for k in self.interface.inputs.keys(): @@ -277,9 +282,15 @@ def _raw_execute(self, **kwargs) -> Any: single_instance_inputs[k] = kwargs[k][i] else: single_instance_inputs[k] = kwargs[k] - o = exception_scopes.user_entry_point(self._run_task.execute)(**single_instance_inputs) - if outputs_expected: - outputs.append(o) + try: + o = exception_scopes.user_entry_point(self._run_task.execute)(**single_instance_inputs) + if outputs_expected: + outputs.append(o) + except Exception as exc: + outputs.append(None) + failed_count += 1 + if len(kwargs[any_input_key]) - failed_count < min_successes: + raise exc return outputs diff --git a/tests/flytekit/unit/core/test_map_task.py b/tests/flytekit/unit/core/test_map_task.py index f66ab7bd49..7c45e27b87 100644 --- a/tests/flytekit/unit/core/test_map_task.py +++ b/tests/flytekit/unit/core/test_map_task.py @@ -282,3 +282,30 @@ def my_wf1() -> typing.List[type_t]: return map_task(some_task1, min_success_ratio=min_success_ratio)(inputs=[1, 2, 3, 4]) my_wf1() + + +@pytest.mark.parametrize( + "min_success_ratio, should_raise_error", + [ + (None, True), + (1, True), + (0.75, False), + (0.5, False), + ], +) +def test_raw_execute_with_min_success_ratio(min_success_ratio, should_raise_error): + @task + def some_task1(inputs: int) -> int: + if inputs == 2: + raise ValueError("Unexpected inputs: 2") + return inputs + + @workflow + def my_wf1() -> typing.List[typing.Optional[int]]: + return map_task(some_task1, min_success_ratio=min_success_ratio)(inputs=[1, 2, 3, 4]) + + if should_raise_error: + with (pytest.raises(ValueError)): + my_wf1() + else: + my_wf1() From 03c716329bb21e2a6506072ac8ea8beeef7b3245 Mon Sep 17 00:00:00 2001 From: Chao-Heng Lee Date: Sat, 14 Oct 2023 12:29:48 +0800 Subject: [PATCH 2/3] also update array_node_map_task. Signed-off-by: Chao-Heng Lee --- flytekit/core/array_node_map_task.py | 20 +++++++++++-- flytekit/core/map_task.py | 3 +- .../unit/core/test_array_node_map_task.py | 28 +++++++++++++++++++ tests/flytekit/unit/core/test_map_task.py | 2 +- 4 files changed, 48 insertions(+), 5 deletions(-) diff --git a/flytekit/core/array_node_map_task.py b/flytekit/core/array_node_map_task.py index aafbaea3ad..15ca87ec4b 100644 --- a/flytekit/core/array_node_map_task.py +++ b/flytekit/core/array_node_map_task.py @@ -2,6 +2,7 @@ import functools import hashlib import logging +import math import os # TODO: use flytekit logger from contextlib import contextmanager from typing import Dict, List, Optional, Set, Union, cast @@ -276,6 +277,13 @@ def _raw_execute(self, **kwargs) -> Any: else None ) + failed_count = 0 + min_successes = len(kwargs[any_input_key]) + if self._min_successes: + min_successes = self._min_successes + elif self._min_success_ratio: + min_successes = math.ceil(min_successes * self._min_success_ratio) + for i in range(len(kwargs[any_input_key])): single_instance_inputs = {} for k in self.interface.inputs.keys(): @@ -284,9 +292,15 @@ def _raw_execute(self, **kwargs) -> Any: single_instance_inputs[k] = kwargs[k][i] else: single_instance_inputs[k] = kwargs[k] - o = exception_scopes.user_entry_point(self.python_function_task.execute)(**single_instance_inputs) - if outputs_expected: - outputs.append(o) + try: + o = exception_scopes.user_entry_point(self._run_task.execute)(**single_instance_inputs) + if outputs_expected: + outputs.append(o) + except Exception as exc: + outputs.append(None) + failed_count += 1 + if len(kwargs[any_input_key]) - failed_count < min_successes: + raise exc return outputs diff --git a/flytekit/core/map_task.py b/flytekit/core/map_task.py index febcd82b7b..99231c5948 100644 --- a/flytekit/core/map_task.py +++ b/flytekit/core/map_task.py @@ -5,6 +5,7 @@ import functools import hashlib import logging +import math import os import typing from contextlib import contextmanager @@ -272,7 +273,7 @@ def _raw_execute(self, **kwargs) -> Any: failed_count = 0 min_successes = len(kwargs[any_input_key]) if self._min_success_ratio: - min_successes = int(min_successes * self._min_success_ratio) + min_successes = math.ceil(min_successes * self._min_success_ratio) for i in range(len(kwargs[any_input_key])): single_instance_inputs = {} diff --git a/tests/flytekit/unit/core/test_array_node_map_task.py b/tests/flytekit/unit/core/test_array_node_map_task.py index 2de15667d5..37a12a2845 100644 --- a/tests/flytekit/unit/core/test_array_node_map_task.py +++ b/tests/flytekit/unit/core/test_array_node_map_task.py @@ -1,4 +1,5 @@ import functools +import typing from collections import OrderedDict from typing import List @@ -230,3 +231,30 @@ def many_outputs(a: int) -> (int, str): with pytest.raises(ValueError): _ = array_node_map_task(many_outputs) + + +@pytest.mark.parametrize( + "min_success_ratio, should_raise_error", + [ + (None, True), + (1, True), + (0.75, False), + (0.5, False), + ], +) +def test_raw_execute_with_min_success_ratio(min_success_ratio, should_raise_error): + @task + def some_task1(inputs: int) -> int: + if inputs == 2: + raise ValueError("Unexpected inputs: 2") + return inputs + + @workflow + def my_wf1() -> typing.List[typing.Optional[int]]: + return array_node_map_task(some_task1, min_success_ratio=min_success_ratio)(inputs=[1, 2, 3, 4]) + + if should_raise_error: + with (pytest.raises(ValueError)): + my_wf1() + else: + assert my_wf1() == [1, None, 3, 4] diff --git a/tests/flytekit/unit/core/test_map_task.py b/tests/flytekit/unit/core/test_map_task.py index 7c45e27b87..bfb9fef168 100644 --- a/tests/flytekit/unit/core/test_map_task.py +++ b/tests/flytekit/unit/core/test_map_task.py @@ -308,4 +308,4 @@ def my_wf1() -> typing.List[typing.Optional[int]]: with (pytest.raises(ValueError)): my_wf1() else: - my_wf1() + assert my_wf1() == [1, None, 3, 4] From 5d4f11563686f433f1216994c1fd5276fd2e9ab0 Mon Sep 17 00:00:00 2001 From: Chao-Heng Lee Date: Fri, 20 Oct 2023 16:34:53 +0800 Subject: [PATCH 3/3] add log with error. Signed-off-by: Chao-Heng Lee --- flytekit/core/array_node_map_task.py | 2 ++ flytekit/core/map_task.py | 2 ++ 2 files changed, 4 insertions(+) diff --git a/flytekit/core/array_node_map_task.py b/flytekit/core/array_node_map_task.py index df8cf5a663..ef1fc510e1 100644 --- a/flytekit/core/array_node_map_task.py +++ b/flytekit/core/array_node_map_task.py @@ -16,6 +16,7 @@ from flytekit.core.python_function_task import PythonFunctionTask, PythonInstanceTask from flytekit.core.utils import timeit from flytekit.exceptions import scopes as exception_scopes +from flytekit.loggers import logger from flytekit.models.array_job import ArrayJob from flytekit.models.core.workflow import NodeMetadata from flytekit.models.interface import Variable @@ -300,6 +301,7 @@ def _raw_execute(self, **kwargs) -> Any: outputs.append(None) failed_count += 1 if mapped_tasks_count - failed_count < min_successes: + logger.error("The number of successful tasks is lower than the minimum ratio") raise exc return outputs diff --git a/flytekit/core/map_task.py b/flytekit/core/map_task.py index 9b648e278b..5732a937ca 100644 --- a/flytekit/core/map_task.py +++ b/flytekit/core/map_task.py @@ -21,6 +21,7 @@ from flytekit.core.tracker import TrackedInstance from flytekit.core.utils import timeit from flytekit.exceptions import scopes as exception_scopes +from flytekit.loggers import logger from flytekit.models.array_job import ArrayJob from flytekit.models.interface import Variable from flytekit.models.task import Container, K8sPod, Sql @@ -293,6 +294,7 @@ def _raw_execute(self, **kwargs) -> Any: outputs.append(None) failed_count += 1 if mapped_tasks_count - failed_count < min_successes: + logger.error("The number of successful tasks is lower than the minimum ratio") raise exc return outputs