Skip to content

Commit

Permalink
fix: never_fail in sensor (#40915)
Browse files Browse the repository at this point in the history
* fix: never_fail in sensor

* fix: tests

* fix: tests 2

* review 1 - doc

---------

Co-authored-by: raphaelauv <[email protected]>
  • Loading branch information
raphaelauv and raphaelauv authored Jul 21, 2024
1 parent bc51c94 commit dc2f2dd
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 7 deletions.
16 changes: 13 additions & 3 deletions airflow/sensors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ class BaseSensorOperator(BaseOperator, SkipMixin):
Sensor operators keep executing at a time interval and succeed when
a criteria is met and fail if and when they time out.
:param soft_fail: Set to true to mark the task as SKIPPED on failure
:param soft_fail: Set to true to mark the task as SKIPPED on failure.
Mutually exclusive with never_fail.
:param poke_interval: Time that the job should wait in between each try.
Can be ``timedelta`` or ``float`` seconds.
:param timeout: Time elapsed before the task times out and fails.
Expand Down Expand Up @@ -154,6 +155,8 @@ class BaseSensorOperator(BaseOperator, SkipMixin):
and AirflowFailException, the sensor will log the error and continue
its execution. Otherwise, the sensor task fails, and it can be retried
based on the provided `retries` parameter.
:param never_fail: If true, and poke method raises an exception, sensor will be skipped.
Mutually exclusive with soft_fail.
"""

ui_color: str = "#e6f1f2"
Expand All @@ -173,6 +176,7 @@ def __init__(
exponential_backoff: bool = False,
max_wait: timedelta | float | None = None,
silent_fail: bool = False,
never_fail: bool = False,
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand All @@ -182,7 +186,11 @@ def __init__(
self.mode = mode
self.exponential_backoff = exponential_backoff
self.max_wait = self._coerce_max_wait(max_wait)
if soft_fail is True and never_fail is True:
raise ValueError("soft_fail and never_fail are mutually exclusive, you can not provide both.")

self.silent_fail = silent_fail
self.never_fail = never_fail
self._validate_input_values()

@staticmethod
Expand Down Expand Up @@ -283,15 +291,17 @@ def run_duration() -> float:
) as e:
if self.soft_fail:
raise AirflowSkipException("Skipping due to soft_fail is set to True.") from e
elif self.never_fail:
raise AirflowSkipException("Skipping due to never_fail is set to True.") from e
raise e
except AirflowSkipException as e:
raise e
except Exception as e:
if self.silent_fail:
self.log.error("Sensor poke failed: \n %s", traceback.format_exc())
poke_return = False
elif self.soft_fail:
raise AirflowSkipException("Skipping due to soft_fail is set to True.") from e
elif self.never_fail:
raise AirflowSkipException("Skipping due to never_fail is set to True.") from e
else:
raise e

Expand Down
38 changes: 36 additions & 2 deletions tests/sensors/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,16 +190,33 @@ def test_soft_fail(self, make_sensor):
if ti.task_id == DUMMY_OP:
assert ti.state == State.NONE

@pytest.mark.parametrize(
"exception_cls",
(ValueError,),
)
def test_soft_fail_with_exception(self, make_sensor, exception_cls):
sensor, dr = make_sensor(False, soft_fail=True)
sensor.poke = Mock(side_effect=[exception_cls(None)])
with pytest.raises(ValueError):
self._run(sensor)

tis = dr.get_task_instances()
assert len(tis) == 2
for ti in tis:
if ti.task_id == SENSOR_OP:
assert ti.state == State.FAILED
if ti.task_id == DUMMY_OP:
assert ti.state == State.NONE

@pytest.mark.parametrize(
"exception_cls",
(
AirflowSensorTimeout,
AirflowTaskTimeout,
AirflowFailException,
Exception,
),
)
def test_soft_fail_with_non_skip_exception(self, make_sensor, exception_cls):
def test_soft_fail_with_skip_exception(self, make_sensor, exception_cls):
sensor, dr = make_sensor(False, soft_fail=True)
sensor.poke = Mock(side_effect=[exception_cls(None)])

Expand All @@ -212,6 +229,23 @@ def test_soft_fail_with_non_skip_exception(self, make_sensor, exception_cls):
if ti.task_id == DUMMY_OP:
assert ti.state == State.NONE

@pytest.mark.parametrize(
"exception_cls",
(AirflowSensorTimeout, AirflowTaskTimeout, AirflowFailException, Exception),
)
def test_never_fail_with_skip_exception(self, make_sensor, exception_cls):
sensor, dr = make_sensor(False, never_fail=True)
sensor.poke = Mock(side_effect=[exception_cls(None)])

self._run(sensor)
tis = dr.get_task_instances()
assert len(tis) == 2
for ti in tis:
if ti.task_id == SENSOR_OP:
assert ti.state == State.SKIPPED
if ti.task_id == DUMMY_OP:
assert ti.state == State.NONE

def test_soft_fail_with_retries(self, make_sensor):
sensor, dr = make_sensor(
return_value=False, soft_fail=True, retries=1, retry_delay=timedelta(milliseconds=1)
Expand Down
3 changes: 1 addition & 2 deletions tests/sensors/test_external_task_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,7 @@ def test_fail_poke(
),
(
True,
AirflowSkipException,
AirflowException,
),
),
)
Expand Down Expand Up @@ -982,7 +982,6 @@ def test_fail__check_for_existence(
check_existence=True,
**kwargs,
)
expected_message = "Skipping due to soft_fail is set to True." if soft_fail else expected_message
with pytest.raises(expected_exception, match=expected_message):
op.execute(context={})

Expand Down

0 comments on commit dc2f2dd

Please sign in to comment.