Skip to content

Commit

Permalink
airflow.models.taskinstance deprecations removed (#41784)
Browse files Browse the repository at this point in the history
  • Loading branch information
dirrao authored Aug 27, 2024
1 parent a0baa68 commit 615cddf
Show file tree
Hide file tree
Showing 12 changed files with 26 additions and 188 deletions.
171 changes: 0 additions & 171 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import jinja2
import lazy_object_proxy
import pendulum
from deprecated import deprecated
from jinja2 import TemplateAssertionError, UndefinedError
from sqlalchemy import (
Column,
Expand Down Expand Up @@ -80,7 +79,6 @@
AirflowSkipException,
AirflowTaskTerminated,
AirflowTaskTimeout,
DagRunNotFound,
RemovedInAirflow3Warning,
TaskDeferralError,
TaskDeferred,
Expand Down Expand Up @@ -425,7 +423,6 @@ def _stop_remaining_tasks(*, task_instance: TaskInstance | TaskInstancePydantic,
def clear_task_instances(
tis: list[TaskInstance],
session: Session,
activate_dag_runs: None = None,
dag: DAG | None = None,
dag_run_state: DagRunState | Literal[False] = DagRunState.QUEUED,
) -> None:
Expand All @@ -443,7 +440,6 @@ def clear_task_instances(
:param dag_run_state: state to set finished DagRuns to.
If set to False, DagRuns state will not be changed.
:param dag: DAG object
:param activate_dag_runs: Deprecated parameter, do not pass
"""
job_ids = []
# Keys: dag_id -> run_id -> map_indexes -> try_numbers -> task_id
Expand Down Expand Up @@ -521,16 +517,6 @@ def clear_task_instances(

session.execute(update(Job).where(Job.id.in_(job_ids)).values(state=JobState.RESTARTING))

if activate_dag_runs is not None:
warnings.warn(
"`activate_dag_runs` parameter to clear_task_instances function is deprecated. "
"Please use `dag_run_state`",
RemovedInAirflow3Warning,
stacklevel=2,
)
if not activate_dag_runs:
dag_run_state = False

if dag_run_state is not False and tis:
from airflow.models.dagrun import DagRun # Avoid circular import

Expand Down Expand Up @@ -1922,7 +1908,6 @@ class TaskInstance(Base, LoggingMixin):
def __init__(
self,
task: Operator,
execution_date: datetime | None = None,
run_id: str | None = None,
state: str | None = None,
map_index: int = -1,
Expand All @@ -1938,42 +1923,7 @@ def __init__(
# init_on_load will config the log
self.init_on_load()

if run_id is None and execution_date is not None:
from airflow.models.dagrun import DagRun # Avoid circular import

warnings.warn(
"Passing an execution_date to `TaskInstance()` is deprecated in favour of passing a run_id",
RemovedInAirflow3Warning,
# Stack level is 4 because SQLA adds some wrappers around the constructor
stacklevel=4,
)
# make sure we have a localized execution_date stored in UTC
if execution_date and not timezone.is_localized(execution_date):
self.log.warning(
"execution date %s has no timezone information. Using default from dag or system",
execution_date,
)
if self.task.has_dag():
if TYPE_CHECKING:
assert self.task.dag
execution_date = timezone.make_aware(execution_date, self.task.dag.timezone)
else:
execution_date = timezone.make_aware(execution_date)

execution_date = timezone.convert_to_utc(execution_date)
with create_session() as session:
run_id = (
session.query(DagRun.run_id)
.filter_by(dag_id=self.dag_id, execution_date=execution_date)
.scalar()
)
if run_id is None:
raise DagRunNotFound(
f"DagRun for {self.dag_id!r} with date {execution_date} not found"
) from None

self.run_id = run_id

self.try_number = 0
self.max_tries = self.task.retries
self.unixname = getuser()
Expand All @@ -1989,26 +1939,6 @@ def __init__(
def __hash__(self):
return hash((self.task_id, self.dag_id, self.run_id, self.map_index))

@property
@deprecated(reason="Use try_number instead.", version="2.10.0", category=RemovedInAirflow3Warning)
def _try_number(self):
"""
Do not use. For semblance of backcompat.
:meta private:
"""
return self.try_number

@_try_number.setter
@deprecated(reason="Use try_number instead.", version="2.10.0", category=RemovedInAirflow3Warning)
def _try_number(self, val):
"""
Do not use. For semblance of backcompat.
:meta private:
"""
self.try_number = val

@property
def stats_tags(self) -> dict[str, str]:
"""Returns task instance tags."""
Expand Down Expand Up @@ -2051,23 +1981,6 @@ def init_on_load(self) -> None:
"""Initialize the attributes that aren't stored in the DB."""
self.test_mode = False # can be changed when calling 'run'

@property
@deprecated(reason="Use try_number instead.", version="2.10.0", category=RemovedInAirflow3Warning)
def prev_attempted_tries(self) -> int:
"""
Calculate the total number of attempted tries, defaulting to 0.
This used to be necessary because try_number did not always tell the truth.
:meta private:
"""
return self.try_number

@property
def next_try_number(self) -> int:
# todo (dstandish): deprecate this property; we don't need a property that is just + 1
return self.try_number + 1

@property
def operator_name(self) -> str | None:
"""@property: use a more friendly display name for the operator, if set."""
Expand Down Expand Up @@ -2498,40 +2411,6 @@ def get_previous_ti(
"""
return _get_previous_ti(task_instance=self, state=state, session=session)

@property
def previous_ti(self) -> TaskInstance | TaskInstancePydantic | None:
"""
This attribute is deprecated.
Please use :class:`airflow.models.taskinstance.TaskInstance.get_previous_ti`.
"""
warnings.warn(
"""
This attribute is deprecated.
Please use `airflow.models.taskinstance.TaskInstance.get_previous_ti` method.
""",
RemovedInAirflow3Warning,
stacklevel=2,
)
return self.get_previous_ti()

@property
def previous_ti_success(self) -> TaskInstance | TaskInstancePydantic | None:
"""
This attribute is deprecated.
Please use :class:`airflow.models.taskinstance.TaskInstance.get_previous_ti`.
"""
warnings.warn(
"""
This attribute is deprecated.
Please use `airflow.models.taskinstance.TaskInstance.get_previous_ti` method.
""",
RemovedInAirflow3Warning,
stacklevel=2,
)
return self.get_previous_ti(state=DagRunState.SUCCESS)

@provide_session
def get_previous_execution_date(
self,
Expand All @@ -2558,23 +2437,6 @@ def get_previous_start_date(
"""
return _get_previous_start_date(task_instance=self, state=state, session=session)

@property
def previous_start_date_success(self) -> pendulum.DateTime | None:
"""
This attribute is deprecated.
Please use :class:`airflow.models.taskinstance.TaskInstance.get_previous_start_date`.
"""
warnings.warn(
"""
This attribute is deprecated.
Please use `airflow.models.taskinstance.TaskInstance.get_previous_start_date` method.
""",
RemovedInAirflow3Warning,
stacklevel=2,
)
return self.get_previous_start_date(state=DagRunState.SUCCESS)

@provide_session
def are_dependencies_met(
self, dep_context: DepContext | None = None, session: Session = NEW_SESSION, verbose: bool = False
Expand Down Expand Up @@ -4115,21 +3977,6 @@ def __eq__(self, other):
return self.__dict__ == other.__dict__
return NotImplemented

def as_dict(self):
warnings.warn(
"This method is deprecated. Use BaseSerialization.serialize.",
RemovedInAirflow3Warning,
stacklevel=2,
)
new_dict = dict(self.__dict__)
for key in new_dict:
if key in ["start_date", "end_date"]:
val = new_dict[key]
if not val or isinstance(val, str):
continue
new_dict.update({key: val.isoformat()})
return new_dict

@classmethod
def from_ti(cls, ti: TaskInstance) -> SimpleTaskInstance:
return cls(
Expand All @@ -4150,24 +3997,6 @@ def from_ti(cls, ti: TaskInstance) -> SimpleTaskInstance:
priority_weight=ti.priority_weight if hasattr(ti, "priority_weight") else None,
)

@classmethod
def from_dict(cls, obj_dict: dict) -> SimpleTaskInstance:
warnings.warn(
"This method is deprecated. Use BaseSerialization.deserialize.",
RemovedInAirflow3Warning,
stacklevel=2,
)
ti_key = TaskInstanceKey(*obj_dict.pop("key"))
start_date = None
end_date = None
start_date_str: str | None = obj_dict.pop("start_date")
end_date_str: str | None = obj_dict.pop("end_date")
if start_date_str:
start_date = timezone.parse(start_date_str)
if end_date_str:
end_date = timezone.parse(end_date_str)
return cls(**obj_dict, start_date=start_date, end_date=end_date, key=ti_key)


class TaskInstanceNote(TaskInstanceDependencies):
"""For storage of arbitrary notes concerning the task instance."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[Task
airflow_cmd=ti.command_as_list(),
queue=ti.queue,
exec_config=ti.executor_config,
attempt_number=ti.prev_attempted_tries,
attempt_number=ti.try_number,
)
adopted_tis.append(ti)

Expand Down
2 changes: 1 addition & 1 deletion airflow/utils/log/file_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ def read(self, task_instance, try_number=None, metadata=None):
# try number gets incremented in DB, i.e logs produced the time
# after cli run and before try_number + 1 in DB will not be displayed.
if try_number is None:
next_try = task_instance.next_try_number
next_try = task_instance.try_number + 1
try_numbers = list(range(1, next_try))
elif try_number < 1:
logs = [
Expand Down
2 changes: 1 addition & 1 deletion airflow/utils/log/log_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def read_log_stream(self, ti: TaskInstance, try_number: int | None, metadata: di
:param metadata: A dictionary containing information about how to read the task log
"""
if try_number is None:
next_try = ti.next_try_number
next_try = ti.try_number + 1
try_numbers = list(range(1, next_try))
else:
try_numbers = [try_number]
Expand Down
12 changes: 12 additions & 0 deletions newsfragments/41784.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
Removed a set of deprecations in from ``airflow.models.taskinstance``.

- Removed deprecated arg ``activate_dag_runs`` from ``TaskInstance.clear_task_instances()``. Please use ``dag_run_state`` instead.
- Removed deprecated arg ``execution_date`` from ``TaskInstance.__init__()``. Please use ``run_id`` instead.
- Removed deprecated property ``_try_number`` from ``TaskInstance``. Please use ``try_number`` instead.
- Removed deprecated property ``prev_attempted_tries`` from ``TaskInstance``. Please use ``try_number`` instead.
- Removed deprecated property ``next_try_number`` from ``TaskInstance``. Please use ``try_number + 1`` instead.
- Removed deprecated property ``previous_ti`` from ``TaskInstance``. Please use ``get_previous_ti`` instead.
- Removed deprecated property ``previous_ti_success`` from ``TaskInstance``. Please use ``get_previous_ti`` instead.
- Removed deprecated property ``previous_start_date_success`` from ``TaskInstance``. Please use ``get_previous_start_date`` instead.
- Removed deprecated function ``as_dict`` from ``SimpleTaskInstance``. Please use ``BaseSerialization.serialize`` instead.
- Removed deprecated function ``from_dict`` from ``SimpleTaskInstance``. Please use ``BaseSerialization.deserialize`` instead.
2 changes: 1 addition & 1 deletion tests/jobs/test_triggerer_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def create_trigger_in_db(session, trigger, operator=None):
operator.dag = dag
else:
operator = BaseOperator(task_id="test_ti", dag=dag)
task_instance = TaskInstance(operator, execution_date=run.execution_date, run_id=run.run_id)
task_instance = TaskInstance(operator, run_id=run.run_id)
task_instance.trigger_id = trigger_orm.id
session.add(dag_model)
session.add(run)
Expand Down
6 changes: 3 additions & 3 deletions tests/models/test_baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1094,11 +1094,11 @@ def test_get_task_instances(session):
"run_type": DagRunType.MANUAL,
}
dr1 = DagRun(execution_date=first_execution_date, run_id="test_run_id_1", **common_dr_kwargs)
ti_1 = TaskInstance(run_id=dr1.run_id, task=task, execution_date=first_execution_date)
ti_1 = TaskInstance(run_id=dr1.run_id, task=task)
dr2 = DagRun(execution_date=second_execution_date, run_id="test_run_id_2", **common_dr_kwargs)
ti_2 = TaskInstance(run_id=dr2.run_id, task=task, execution_date=second_execution_date)
ti_2 = TaskInstance(run_id=dr2.run_id, task=task)
dr3 = DagRun(execution_date=third_execution_date, run_id="test_run_id_3", **common_dr_kwargs)
ti_3 = TaskInstance(run_id=dr3.run_id, task=task, execution_date=third_execution_date)
ti_3 = TaskInstance(run_id=dr3.run_id, task=task)
session.add_all([dr1, dr2, dr3, ti_1, ti_2, ti_3])
session.commit()

Expand Down
2 changes: 1 addition & 1 deletion tests/models/test_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -1422,7 +1422,7 @@ def task_2(arg2): ...
assert len(decision.schedulable_tis) == 2

# We insert a faulty record
session.add(TaskInstance(dag.get_task("task_2"), dr.execution_date, dr.run_id))
session.add(TaskInstance(task=dag.get_task("task_2"), run_id=dr.run_id))
session.flush()

decision = dr.task_instance_scheduling_decisions()
Expand Down
5 changes: 1 addition & 4 deletions tests/providers/microsoft/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,11 @@ class MockedTaskInstance(TaskInstance):
def __init__(
self,
task,
execution_date: datetime | None = None,
run_id: str | None = "run_id",
state: str | None = TaskInstanceState.RUNNING,
map_index: int = -1,
):
super().__init__(
task=task, execution_date=execution_date, run_id=run_id, state=state, map_index=map_index
)
super().__init__(task=task, run_id=run_id, state=state, map_index=map_index)
self.values: dict[str, Any] = {}

def xcom_pull(
Expand Down
4 changes: 2 additions & 2 deletions tests/ti_deps/deps/test_dag_ti_slots_available_dep.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def test_concurrency_reached(self):
"""
dag = Mock(concurrency=1, get_concurrency_reached=Mock(return_value=True))
task = Mock(dag=dag, pool_slots=1)
ti = TaskInstance(task, execution_date=None)
ti = TaskInstance(task)

assert not DagTISlotsAvailableDep().is_met(ti=ti)

Expand All @@ -44,6 +44,6 @@ def test_all_conditions_met(self):
"""
dag = Mock(concurrency=1, get_concurrency_reached=Mock(return_value=False))
task = Mock(dag=dag, pool_slots=1)
ti = TaskInstance(task, execution_date=None)
ti = TaskInstance(task)

assert DagTISlotsAvailableDep().is_met(ti=ti)
4 changes: 2 additions & 2 deletions tests/ti_deps/deps/test_dag_unpaused_dep.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def test_concurrency_reached(self):
"""
dag = Mock(**{"get_is_paused.return_value": True})
task = Mock(dag=dag)
ti = TaskInstance(task=task, execution_date=None)
ti = TaskInstance(task=task)

assert not DagUnpausedDep().is_met(ti=ti)

Expand All @@ -44,6 +44,6 @@ def test_all_conditions_met(self):
"""
dag = Mock(**{"get_is_paused.return_value": False})
task = Mock(dag=dag)
ti = TaskInstance(task=task, execution_date=None)
ti = TaskInstance(task=task)

assert DagUnpausedDep().is_met(ti=ti)
2 changes: 1 addition & 1 deletion tests/ti_deps/deps/test_not_in_retry_period_dep.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
class TestNotInRetryPeriodDep:
def _get_task_instance(self, state, end_date=None, retry_delay=timedelta(minutes=15)):
task = Mock(retry_delay=retry_delay, retry_exponential_backoff=False)
ti = TaskInstance(task=task, state=state, execution_date=None)
ti = TaskInstance(task=task, state=state)
ti.end_date = end_date
return ti

Expand Down

0 comments on commit 615cddf

Please sign in to comment.