Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid pendulum.from_timestamp usage #37160

Merged
merged 1 commit into from
Feb 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion airflow/providers/elasticsearch/log/es_json_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
# under the License.
from __future__ import annotations

from datetime import datetime

import pendulum

from airflow.utils.log.json_formatter import JSONFormatter
Expand All @@ -30,7 +32,9 @@ class ElasticsearchJSONFormatter(JSONFormatter):

def formatTime(self, record, datefmt=None):
"""Return the creation time of the LogRecord in ISO 8601 date/time format in the local time zone."""
dt = pendulum.from_timestamp(record.created, tz=pendulum.local_timezone())
# TODO: Use airflow.utils.timezone.from_timestamp(record.created, tz="local")
# as soon as min Airflow 2.9.0
dt = datetime.fromtimestamp(record.created, tz=pendulum.local_timezone())
s = dt.strftime(datefmt or self.default_time_format)
if self.default_msec_format:
s = self.default_msec_format % (s, record.msecs)
Expand Down
7 changes: 3 additions & 4 deletions airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

import attrs
import lazy_object_proxy
import pendulum
from dateutil import relativedelta
from pendulum.tz.timezone import FixedTimezone, Timezone

Expand Down Expand Up @@ -65,7 +64,7 @@
from airflow.utils.module_loading import import_string, qualname
from airflow.utils.operator_resources import Resources
from airflow.utils.task_group import MappedTaskGroup, TaskGroup
from airflow.utils.timezone import parse_timezone
from airflow.utils.timezone import from_timestamp, parse_timezone
from airflow.utils.types import NOTSET, ArgNotSet

if TYPE_CHECKING:
Expand Down Expand Up @@ -567,7 +566,7 @@ def deserialize(cls, encoded_var: Any, use_pydantic_models=False) -> Any:
elif type_ == DAT.OP:
return SerializedBaseOperator.deserialize_operator(var)
elif type_ == DAT.DATETIME:
return pendulum.from_timestamp(var)
return from_timestamp(var)
elif type_ == DAT.POD:
if not _has_kubernetes():
raise RuntimeError("Cannot deserialize POD objects without kubernetes libraries installed!")
Expand Down Expand Up @@ -611,7 +610,7 @@ def deserialize(cls, encoded_var: Any, use_pydantic_models=False) -> Any:
else:
raise TypeError(f"Invalid type {type_!s} in deserialization.")

_deserialize_datetime = pendulum.from_timestamp
_deserialize_datetime = from_timestamp
_deserialize_timezone = parse_timezone

@classmethod
Expand Down
4 changes: 2 additions & 2 deletions airflow/utils/log/timezone_aware.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import logging

import pendulum
from airflow.utils import timezone


class TimezoneAware(logging.Formatter):
Expand All @@ -39,7 +39,7 @@ def formatTime(self, record, datefmt=None):
This returns the creation time of the specified LogRecord in ISO 8601
date and time format in the local time zone.
"""
dt = pendulum.from_timestamp(record.created, tz=pendulum.local_timezone())
dt = timezone.from_timestamp(record.created, tz="local")
s = dt.strftime(datefmt or self.default_time_format)
if self.default_msec_format:
s = self.default_msec_format % (s, record.msecs)
Expand Down
22 changes: 22 additions & 0 deletions airflow/utils/timezone.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
if TYPE_CHECKING:
from pendulum.tz.timezone import FixedTimezone, Timezone

from airflow.typing_compat import Literal

_PENDULUM3 = pendulum.__version__.startswith("3")
# UTC Timezone as a tzinfo instance. Actual value depends on pendulum version:
# - Timezone("UTC") in pendulum 3
Expand Down Expand Up @@ -299,3 +301,23 @@ def local_timezone() -> FixedTimezone | Timezone:
:meta private:
"""
return pendulum.tz.local_timezone()


def from_timestamp(
timestamp: int | float, tz: str | FixedTimezone | Timezone | Literal["local"] = utc
) -> DateTime:
"""
Parse timestamp and return DateTime in a given time zone.

:param timestamp: epoch time in seconds.
:param tz: In which timezone should return a resulting object.
Could be either one of pendulum timezone, IANA timezone or `local` literal.

:meta private:
"""
result = coerce_datetime(dt.datetime.fromtimestamp(timestamp, tz=utc))
if tz != utc or tz != "UTC":
if isinstance(tz, str) and tz.lower() == "local":
tz = local_timezone()
result = result.in_timezone(tz)
return result
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1372,6 +1372,8 @@ combine-as-imports = true
"airflow.AirflowException".msg = "Use airflow.exceptions.AirflowException instead."
"airflow.Dataset".msg = "Use airflow.datasets.Dataset instead."
"airflow.models.baseoperator.BaseOperatorLink".msg = "Use airflow.models.baseoperatorlink.BaseOperatorLink"
# Uses deprecated in Python 3.12 `datetime.datetime.utcfromtimestamp`
"pendulum.from_timestamp".msg = "Use airflow.utils.timezone.from_timestamp"

[tool.ruff.flake8-tidy-imports]
# Ban certain modules from being imported at module level, instead requiring
Expand Down
46 changes: 45 additions & 1 deletion tests/utils/test_timezone.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import pendulum
import pytest
from pendulum.tz.timezone import Timezone
from pendulum.tz.timezone import FixedTimezone, Timezone

from airflow.utils import timezone
from airflow.utils.timezone import coerce_datetime, parse_timezone
Expand Down Expand Up @@ -156,3 +156,47 @@ def test_parse_timezone_offset(tz_offset: int, expected_offset, expected_name):
assert tz.offset == expected_offset
assert tz.name == expected_name
assert parse_timezone(tz_offset) is tz


@pytest.mark.parametrize(
"tz",
[
pytest.param(None, id="implicit"),
pytest.param(timezone.utc, id="explicit"),
pytest.param("UTC", id="utc-literal"),
],
)
def test_from_timestamp_utc(tz):
from_ts = timezone.from_timestamp(0) if tz is None else timezone.from_timestamp(0, tz=tz)
assert from_ts == pendulum.DateTime(1970, 1, 1, tzinfo=timezone.utc)
assert from_ts.tzinfo == timezone.utc


@pytest.mark.parametrize("tz", ["local", "LOCAL"])
def test_from_timestamp_local(tz):
local_tz = timezone.local_timezone()
from_ts = timezone.from_timestamp(0, tz=tz)
assert from_ts == pendulum.DateTime(1970, 1, 1, tzinfo=timezone.utc)
assert from_ts.tzinfo == local_tz


@pytest.mark.parametrize(
"tz, iana_timezone",
[
pytest.param(Timezone("Europe/Paris"), "Europe/Paris", id="pendulum-timezone"),
pytest.param("America/New_York", "America/New_York", id="IANA-timezone"),
],
)
def test_from_timestamp_iana_timezones(tz, iana_timezone):
from_ts = timezone.from_timestamp(0, tz=tz)
assert from_ts == pendulum.DateTime(1970, 1, 1, tzinfo=timezone.utc)
# In pendulum 2 there is a problem with compare tzinfo object (caching?), so we check the name
assert from_ts.tzinfo.name == iana_timezone
assert isinstance(from_ts.tzinfo, Timezone)


@pytest.mark.parametrize("utc_offset", [3600, -7200])
def test_from_timestamp_fixed_timezone(utc_offset):
from_ts = timezone.from_timestamp(0, tz=FixedTimezone(utc_offset))
assert from_ts == pendulum.DateTime(1970, 1, 1, tzinfo=timezone.utc)
assert from_ts.utcoffset() == datetime.timedelta(seconds=utc_offset)