Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Airflow 3: airflow date utils date_range and days_ago deprecations removal #41496

Merged
merged 6 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 3 additions & 118 deletions airflow/utils/dates.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,11 @@
# under the License.
from __future__ import annotations

import warnings
from datetime import datetime, timedelta
from typing import Collection
from typing import TYPE_CHECKING, Collection

from croniter import croniter
from dateutil.relativedelta import relativedelta # for doctest

from airflow.exceptions import RemovedInAirflow3Warning
from airflow.typing_compat import Literal
from airflow.utils import timezone

Expand All @@ -37,103 +34,8 @@
"@yearly": "0 0 1 1 *",
}


def date_range(
start_date: datetime,
end_date: datetime | None = None,
num: int | None = None,
delta: str | timedelta | relativedelta | None = None,
) -> list[datetime]:
"""
Get a list of dates in the specified range, separated by delta.

.. code-block:: pycon
>>> from airflow.utils.dates import date_range
>>> from datetime import datetime, timedelta
>>> date_range(datetime(2016, 1, 1), datetime(2016, 1, 3), delta=timedelta(1))
[datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')),
datetime.datetime(2016, 1, 2, 0, 0, tzinfo=Timezone('UTC')),
datetime.datetime(2016, 1, 3, 0, 0, tzinfo=Timezone('UTC'))]
>>> date_range(datetime(2016, 1, 1), datetime(2016, 1, 3), delta="0 0 * * *")
[datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')),
datetime.datetime(2016, 1, 2, 0, 0, tzinfo=Timezone('UTC')),
datetime.datetime(2016, 1, 3, 0, 0, tzinfo=Timezone('UTC'))]
>>> date_range(datetime(2016, 1, 1), datetime(2016, 3, 3), delta="0 0 0 * *")
[datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')),
datetime.datetime(2016, 2, 1, 0, 0, tzinfo=Timezone('UTC')),
datetime.datetime(2016, 3, 1, 0, 0, tzinfo=Timezone('UTC'))]

:param start_date: anchor date to start the series from
:param end_date: right boundary for the date range
:param num: alternatively to end_date, you can specify the number of
number of entries you want in the range. This number can be negative,
output will always be sorted regardless
:param delta: step length. It can be datetime.timedelta or cron expression as string
"""
warnings.warn(
"`airflow.utils.dates.date_range()` is deprecated. Please use `airflow.timetables`.",
category=RemovedInAirflow3Warning,
stacklevel=2,
)

if not delta:
return []
if end_date:
if start_date > end_date:
raise ValueError("Wait. start_date needs to be before end_date")
if num:
raise ValueError("Wait. Either specify end_date OR num")
if not end_date and not num:
end_date = timezone.utcnow()

delta_iscron = False
time_zone = start_date.tzinfo

abs_delta: timedelta | relativedelta
if isinstance(delta, str):
delta_iscron = True
if timezone.is_localized(start_date):
start_date = timezone.make_naive(start_date, time_zone)
cron = croniter(cron_presets.get(delta, delta), start_date)
elif isinstance(delta, timedelta):
abs_delta = abs(delta)
elif isinstance(delta, relativedelta):
abs_delta = abs(delta)
else:
raise TypeError("Wait. delta must be either datetime.timedelta or cron expression as str")

dates = []
if end_date:
if timezone.is_naive(start_date) and not timezone.is_naive(end_date):
end_date = timezone.make_naive(end_date, time_zone)
while start_date <= end_date: # type: ignore
if timezone.is_naive(start_date):
dates.append(timezone.make_aware(start_date, time_zone))
else:
dates.append(start_date)

if delta_iscron:
start_date = cron.get_next(datetime)
else:
start_date += abs_delta
else:
num_entries: int = num # type: ignore
for _ in range(abs(num_entries)):
if timezone.is_naive(start_date):
dates.append(timezone.make_aware(start_date, time_zone))
else:
dates.append(start_date)

if delta_iscron and num_entries > 0:
start_date = cron.get_next(datetime)
elif delta_iscron:
start_date = cron.get_prev(datetime)
elif num_entries > 0:
start_date += abs_delta
else:
start_date -= abs_delta

return sorted(dates)
if TYPE_CHECKING:
from dateutil.relativedelta import relativedelta # for doctest


def round_time(
Expand Down Expand Up @@ -256,23 +158,6 @@ def scale_time_units(time_seconds_arr: Collection[float], unit: TimeUnit) -> Col
return [x / factor for x in time_seconds_arr]


def days_ago(n, hour=0, minute=0, second=0, microsecond=0):
"""
Get a datetime object representing *n* days ago.

By default the time is set to midnight.
"""
warnings.warn(
"Function `days_ago` is deprecated and will be removed in Airflow 3.0. "
"You can achieve equivalent behavior with `pendulum.today('UTC').add(days=-N, ...)`",
RemovedInAirflow3Warning,
stacklevel=2,
)

today = timezone.utcnow().replace(hour=hour, minute=minute, second=second, microsecond=microsecond)
return today - timedelta(days=n)


def parse_execution_date(execution_date_str):
"""Parse execution date string to datetime object."""
return timezone.parse(execution_date_str)
Expand Down
5 changes: 5 additions & 0 deletions newsfragments/41496.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Removed deprecated methods in airflow/utils/dates.py

Methods removed:
* date_range
* days_ago (Use ``pendulum.today('UTC').add(days=-N, ...)``)
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,11 @@
get_launch_task_id,
get_task_instance,
)
from airflow.utils.dates import days_ago
from airflow.www.app import create_app

DAG_ID = "test_dag"
TASK_ID = "test_task"
RUN_ID = "test_run_1"
DAG_RUN_DATE = days_ago(1)
TASK_INSTANCE_KEY = TaskInstanceKey(dag_id=DAG_ID, task_id=TASK_ID, run_id=RUN_ID, try_number=1)
DATABRICKS_CONN_ID = "databricks_default"
DATABRICKS_RUN_ID = 12345
Expand Down
63 changes: 1 addition & 62 deletions tests/utils/test_dates.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,15 @@
# under the License.
from __future__ import annotations

from datetime import datetime, timedelta
from datetime import timedelta

import pendulum
import pytest
from dateutil.relativedelta import relativedelta

from airflow.utils import dates, timezone


class TestDates:
@pytest.mark.filterwarnings(
"ignore:Function `days_ago` is deprecated.*:airflow.exceptions.RemovedInAirflow3Warning"
)
def test_days_ago(self):
today = pendulum.today()
today_midnight = pendulum.instance(datetime.fromordinal(today.date().toordinal()))

assert dates.days_ago(0) == today_midnight
assert dates.days_ago(100) == today_midnight + timedelta(days=-100)

assert dates.days_ago(0, hour=3) == today_midnight + timedelta(hours=3)
assert dates.days_ago(0, minute=3) == today_midnight + timedelta(minutes=3)
assert dates.days_ago(0, second=3) == today_midnight + timedelta(seconds=3)
assert dates.days_ago(0, microsecond=3) == today_midnight + timedelta(microseconds=3)

def test_parse_execution_date(self):
execution_date_str_wo_ms = "2017-11-02 00:00:00"
execution_date_str_w_ms = "2017-11-05 16:18:30.989729"
Expand Down Expand Up @@ -103,48 +87,3 @@ def test_scale_time_units(self):

arr4 = dates.scale_time_units([200000, 100000], "days")
assert arr4 == pytest.approx([2.3147, 1.1574], rel=1e-3)


@pytest.mark.filterwarnings(
r"ignore:`airflow.utils.dates.date_range\(\)` is deprecated:airflow.exceptions.RemovedInAirflow3Warning"
)
class TestUtilsDatesDateRange:
def test_no_delta(self):
assert dates.date_range(datetime(2016, 1, 1), datetime(2016, 1, 3)) == []

def test_end_date_before_start_date(self):
with pytest.raises(ValueError, match="Wait. start_date needs to be before end_date"):
dates.date_range(datetime(2016, 2, 1), datetime(2016, 1, 1), delta=timedelta(seconds=1))

def test_both_end_date_and_num_given(self):
with pytest.raises(ValueError, match="Wait. Either specify end_date OR num"):
dates.date_range(datetime(2016, 1, 1), datetime(2016, 1, 3), num=2, delta=timedelta(seconds=1))

def test_invalid_delta(self):
exception_msg = "Wait. delta must be either datetime.timedelta or cron expression as str"
with pytest.raises(TypeError, match=exception_msg):
dates.date_range(datetime(2016, 1, 1), datetime(2016, 1, 3), delta=1)

def test_positive_num_given(self):
for num in range(1, 10):
result = dates.date_range(datetime(2016, 1, 1), num=num, delta=timedelta(1))
assert len(result) == num

for i in range(num):
assert timezone.is_localized(result[i])

def test_negative_num_given(self):
for num in range(-1, -5, -10):
result = dates.date_range(datetime(2016, 1, 1), num=num, delta=timedelta(1))
assert len(result) == -num

for i in range(num):
assert timezone.is_localized(result[i])

def test_delta_cron_presets(self):
preset_range = dates.date_range(datetime(2016, 1, 1), num=2, delta="@hourly")
timedelta_range = dates.date_range(datetime(2016, 1, 1), num=2, delta=timedelta(hours=1))
cron_range = dates.date_range(datetime(2016, 1, 1), num=2, delta="0 * * * *")

assert preset_range == timedelta_range
assert preset_range == cron_range