diff --git a/airflow/providers/elasticsearch/log/es_json_formatter.py b/airflow/providers/elasticsearch/log/es_json_formatter.py index cf77896a9218e..49e3f54a50908 100644 --- a/airflow/providers/elasticsearch/log/es_json_formatter.py +++ b/airflow/providers/elasticsearch/log/es_json_formatter.py @@ -16,6 +16,8 @@ # under the License. from __future__ import annotations +from datetime import datetime + import pendulum from airflow.utils.log.json_formatter import JSONFormatter @@ -30,7 +32,9 @@ class ElasticsearchJSONFormatter(JSONFormatter): def formatTime(self, record, datefmt=None): """Return the creation time of the LogRecord in ISO 8601 date/time format in the local time zone.""" - dt = pendulum.from_timestamp(record.created, tz=pendulum.local_timezone()) + # TODO: Use airflow.utils.timezone.from_timestamp(record.created, tz="local") + # as soon as min Airflow 2.9.0 + dt = datetime.fromtimestamp(record.created, tz=pendulum.local_timezone()) s = dt.strftime(datefmt or self.default_time_format) if self.default_msec_format: s = self.default_msec_format % (s, record.msecs) diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index e77c43f5f50c2..5ec07e4f6c1a9 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -30,7 +30,6 @@ import attrs import lazy_object_proxy -import pendulum from dateutil import relativedelta from pendulum.tz.timezone import FixedTimezone, Timezone @@ -65,7 +64,7 @@ from airflow.utils.module_loading import import_string, qualname from airflow.utils.operator_resources import Resources from airflow.utils.task_group import MappedTaskGroup, TaskGroup -from airflow.utils.timezone import parse_timezone +from airflow.utils.timezone import from_timestamp, parse_timezone from airflow.utils.types import NOTSET, ArgNotSet if TYPE_CHECKING: @@ -567,7 +566,7 @@ def deserialize(cls, encoded_var: Any, use_pydantic_models=False) -> Any: elif type_ == DAT.OP: return SerializedBaseOperator.deserialize_operator(var) elif type_ == DAT.DATETIME: - return pendulum.from_timestamp(var) + return from_timestamp(var) elif type_ == DAT.POD: if not _has_kubernetes(): raise RuntimeError("Cannot deserialize POD objects without kubernetes libraries installed!") @@ -611,7 +610,7 @@ def deserialize(cls, encoded_var: Any, use_pydantic_models=False) -> Any: else: raise TypeError(f"Invalid type {type_!s} in deserialization.") - _deserialize_datetime = pendulum.from_timestamp + _deserialize_datetime = from_timestamp _deserialize_timezone = parse_timezone @classmethod diff --git a/airflow/utils/log/timezone_aware.py b/airflow/utils/log/timezone_aware.py index ae96a11116a1f..4ca35480a1bd6 100644 --- a/airflow/utils/log/timezone_aware.py +++ b/airflow/utils/log/timezone_aware.py @@ -18,7 +18,7 @@ import logging -import pendulum +from airflow.utils import timezone class TimezoneAware(logging.Formatter): @@ -39,7 +39,7 @@ def formatTime(self, record, datefmt=None): This returns the creation time of the specified LogRecord in ISO 8601 date and time format in the local time zone. """ - dt = pendulum.from_timestamp(record.created, tz=pendulum.local_timezone()) + dt = timezone.from_timestamp(record.created, tz="local") s = dt.strftime(datefmt or self.default_time_format) if self.default_msec_format: s = self.default_msec_format % (s, record.msecs) diff --git a/airflow/utils/timezone.py b/airflow/utils/timezone.py index 152ef359542e9..966c4bbdc1a15 100644 --- a/airflow/utils/timezone.py +++ b/airflow/utils/timezone.py @@ -27,6 +27,8 @@ if TYPE_CHECKING: from pendulum.tz.timezone import FixedTimezone, Timezone + from airflow.typing_compat import Literal + _PENDULUM3 = pendulum.__version__.startswith("3") # UTC Timezone as a tzinfo instance. Actual value depends on pendulum version: # - Timezone("UTC") in pendulum 3 @@ -299,3 +301,23 @@ def local_timezone() -> FixedTimezone | Timezone: :meta private: """ return pendulum.tz.local_timezone() + + +def from_timestamp( + timestamp: int | float, tz: str | FixedTimezone | Timezone | Literal["local"] = utc +) -> DateTime: + """ + Parse timestamp and return DateTime in a given time zone. + + :param timestamp: epoch time in seconds. + :param tz: In which timezone should return a resulting object. + Could be either one of pendulum timezone, IANA timezone or `local` literal. + + :meta private: + """ + result = coerce_datetime(dt.datetime.fromtimestamp(timestamp, tz=utc)) + if tz != utc or tz != "UTC": + if isinstance(tz, str) and tz.lower() == "local": + tz = local_timezone() + result = result.in_timezone(tz) + return result diff --git a/pyproject.toml b/pyproject.toml index c083fe9a19fc5..beb89064a5a01 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1372,6 +1372,8 @@ combine-as-imports = true "airflow.AirflowException".msg = "Use airflow.exceptions.AirflowException instead." "airflow.Dataset".msg = "Use airflow.datasets.Dataset instead." "airflow.models.baseoperator.BaseOperatorLink".msg = "Use airflow.models.baseoperatorlink.BaseOperatorLink" +# Uses deprecated in Python 3.12 `datetime.datetime.utcfromtimestamp` +"pendulum.from_timestamp".msg = "Use airflow.utils.timezone.from_timestamp" [tool.ruff.flake8-tidy-imports] # Ban certain modules from being imported at module level, instead requiring diff --git a/tests/utils/test_timezone.py b/tests/utils/test_timezone.py index df8af04604234..c7e6d0c09aa81 100644 --- a/tests/utils/test_timezone.py +++ b/tests/utils/test_timezone.py @@ -21,7 +21,7 @@ import pendulum import pytest -from pendulum.tz.timezone import Timezone +from pendulum.tz.timezone import FixedTimezone, Timezone from airflow.utils import timezone from airflow.utils.timezone import coerce_datetime, parse_timezone @@ -156,3 +156,47 @@ def test_parse_timezone_offset(tz_offset: int, expected_offset, expected_name): assert tz.offset == expected_offset assert tz.name == expected_name assert parse_timezone(tz_offset) is tz + + +@pytest.mark.parametrize( + "tz", + [ + pytest.param(None, id="implicit"), + pytest.param(timezone.utc, id="explicit"), + pytest.param("UTC", id="utc-literal"), + ], +) +def test_from_timestamp_utc(tz): + from_ts = timezone.from_timestamp(0) if tz is None else timezone.from_timestamp(0, tz=tz) + assert from_ts == pendulum.DateTime(1970, 1, 1, tzinfo=timezone.utc) + assert from_ts.tzinfo == timezone.utc + + +@pytest.mark.parametrize("tz", ["local", "LOCAL"]) +def test_from_timestamp_local(tz): + local_tz = timezone.local_timezone() + from_ts = timezone.from_timestamp(0, tz=tz) + assert from_ts == pendulum.DateTime(1970, 1, 1, tzinfo=timezone.utc) + assert from_ts.tzinfo == local_tz + + +@pytest.mark.parametrize( + "tz, iana_timezone", + [ + pytest.param(Timezone("Europe/Paris"), "Europe/Paris", id="pendulum-timezone"), + pytest.param("America/New_York", "America/New_York", id="IANA-timezone"), + ], +) +def test_from_timestamp_iana_timezones(tz, iana_timezone): + from_ts = timezone.from_timestamp(0, tz=tz) + assert from_ts == pendulum.DateTime(1970, 1, 1, tzinfo=timezone.utc) + # In pendulum 2 there is a problem with compare tzinfo object (caching?), so we check the name + assert from_ts.tzinfo.name == iana_timezone + assert isinstance(from_ts.tzinfo, Timezone) + + +@pytest.mark.parametrize("utc_offset", [3600, -7200]) +def test_from_timestamp_fixed_timezone(utc_offset): + from_ts = timezone.from_timestamp(0, tz=FixedTimezone(utc_offset)) + assert from_ts == pendulum.DateTime(1970, 1, 1, tzinfo=timezone.utc) + assert from_ts.utcoffset() == datetime.timedelta(seconds=utc_offset)