Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schedule timezone fixes #3315

Merged
merged 11 commits into from
Jan 27, 2025
8 changes: 2 additions & 6 deletions src/zenml/analytics/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
The base functionalities are adapted to work with the ZenML analytics server.
"""

import datetime
import locale
from types import TracebackType
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Type, Union
Expand All @@ -32,6 +31,7 @@
)
from zenml.environment import Environment, get_environment
from zenml.logger import get_logger
from zenml.utils.time_utils import utc_now_tz_aware

if TYPE_CHECKING:
from zenml.analytics.enums import AnalyticsEvent
Expand Down Expand Up @@ -284,11 +284,7 @@ def track(

try:
# Timezone as tzdata
tz = (
datetime.datetime.now(datetime.timezone.utc)
.astimezone()
.tzname()
)
tz = utc_now_tz_aware().astimezone().tzname()
if tz is not None:
properties.update({"timezone": tz})

Expand Down
10 changes: 5 additions & 5 deletions src/zenml/cli/service_connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# permissions and limitations under the License.
"""Service connector CLI commands."""

from datetime import datetime, timezone
from datetime import datetime
from typing import Any, Dict, List, Optional, Union, cast
from uuid import UUID

Expand All @@ -25,7 +25,6 @@
is_sorted_or_filtered,
list_options,
print_page_info,
seconds_to_human_readable,
)
from zenml.client import Client
from zenml.console import console
Expand All @@ -37,6 +36,7 @@
ServiceConnectorResourcesModel,
ServiceConnectorResponse,
)
from zenml.utils.time_utils import seconds_to_human_readable, utc_now


# Service connectors
Expand Down Expand Up @@ -292,7 +292,7 @@ def prompt_expires_at(
default_str = ""
if default is not None:
seconds = int(
(default - datetime.now(timezone.utc)).total_seconds()
(default - utc_now(tz_aware=default)).total_seconds()
)
default_str = (
f" [{str(default)} i.e. in "
Expand All @@ -309,15 +309,15 @@ def prompt_expires_at(

assert expires_at is not None
assert isinstance(expires_at, datetime)
if expires_at < datetime.now(timezone.utc):
if expires_at < utc_now(tz_aware=expires_at):
cli_utils.warning(
"The expiration time must be in the future. Please enter a "
"later date and time."
)
continue

seconds = int(
(expires_at - datetime.now(timezone.utc)).total_seconds()
(expires_at - utc_now(tz_aware=expires_at)).total_seconds()
)

confirm = click.confirm(
Expand Down
4 changes: 2 additions & 2 deletions src/zenml/cli/stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import re
import time
import webbrowser
from datetime import datetime, timezone
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -77,6 +76,7 @@
)
from zenml.utils import requirements_utils
from zenml.utils.dashboard_utils import get_component_url, get_stack_url
from zenml.utils.time_utils import utc_now_tz_aware
from zenml.utils.yaml_utils import read_yaml, write_yaml

if TYPE_CHECKING:
Expand Down Expand Up @@ -1575,7 +1575,7 @@ def deploy(
):
raise click.Abort()

date_start = datetime.now(timezone.utc)
date_start = utc_now_tz_aware()

webbrowser.open(deployment_config.deployment_url)
console.print(
Expand Down
54 changes: 1 addition & 53 deletions src/zenml/cli/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
"""Utility functions for the CLI."""

import contextlib
import datetime
import json
import os
import platform
Expand Down Expand Up @@ -79,6 +78,7 @@
from zenml.stack import StackComponent
from zenml.stack.stack_component import StackComponentConfig
from zenml.utils import secret_utils
from zenml.utils.time_utils import expires_in

if TYPE_CHECKING:
from uuid import UUID
Expand Down Expand Up @@ -1581,58 +1581,6 @@ def print_components_table(
print_table(configurations)


def seconds_to_human_readable(time_seconds: int) -> str:
"""Converts seconds to human-readable format.

Args:
time_seconds: Seconds to convert.

Returns:
Human readable string.
"""
seconds = time_seconds % 60
minutes = (time_seconds // 60) % 60
hours = (time_seconds // 3600) % 24
days = time_seconds // 86400
tokens = []
if days:
tokens.append(f"{days}d")
if hours:
tokens.append(f"{hours}h")
if minutes:
tokens.append(f"{minutes}m")
if seconds:
tokens.append(f"{seconds}s")

return "".join(tokens)


def expires_in(
expires_at: datetime.datetime,
expired_str: str,
skew_tolerance: Optional[int] = None,
) -> str:
"""Returns a human-readable string of the time until the token expires.

Args:
expires_at: Datetime object of the token expiration.
expired_str: String to return if the token is expired.
skew_tolerance: Seconds of skew tolerance to subtract from the
expiration time. If the token expires within this time, it will be
considered expired.

Returns:
Human readable string.
"""
now = datetime.datetime.now(datetime.timezone.utc)
expires_at = expires_at.replace(tzinfo=datetime.timezone.utc)
if skew_tolerance:
expires_at -= datetime.timedelta(seconds=skew_tolerance)
if expires_at < now:
return expired_str
return seconds_to_human_readable(int((expires_at - now).total_seconds()))


def print_service_connectors_table(
client: "Client",
connectors: Sequence["ServiceConnectorResponse"],
Expand Down
5 changes: 3 additions & 2 deletions src/zenml/config/pipeline_configurations.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# permissions and limitations under the License.
"""Pipeline configuration classes."""

from datetime import datetime, timezone
from datetime import datetime
from typing import TYPE_CHECKING, Any, Dict, List, Optional

from pydantic import SerializeAsAny, field_validator
Expand All @@ -23,6 +23,7 @@
from zenml.config.source import SourceWithValidator
from zenml.config.strict_base_model import StrictBaseModel
from zenml.model.model import Model
from zenml.utils.time_utils import utc_now

if TYPE_CHECKING:
from zenml.config import DockerSettings
Expand Down Expand Up @@ -61,7 +62,7 @@ def _get_full_substitutions(
The full substitutions dict including date and time.
"""
if start_time is None:
start_time = datetime.now(timezone.utc)
start_time = utc_now()
ret = self.substitutions.copy()
ret.setdefault("date", start_time.strftime("%Y_%m_%d"))
ret.setdefault("time", start_time.strftime("%H_%M_%S_%f"))
Expand Down
80 changes: 46 additions & 34 deletions src/zenml/config/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,15 @@
# permissions and limitations under the License.
"""Class for defining a pipeline schedule."""

import datetime
from datetime import datetime, timedelta
from typing import Optional

from pydantic import BaseModel, model_validator
from pydantic import (
BaseModel,
ValidationInfo,
field_validator,
model_validator,
)

from zenml.logger import get_logger

Expand All @@ -32,8 +37,12 @@ class Schedule(BaseModel):
and time.
cron_expression: Cron expression for the pipeline schedule. If a value
for this is set it takes precedence over the start time + interval.
start_time: datetime object to indicate when to start the schedule.
end_time: datetime object to indicate when to end the schedule.
start_time: When the schedule should start. If this is a datetime object
without any timezone, it is treated as a datetime in the local
timezone.
end_time: When the schedule should end. If this is a datetime object
without any timezone, it is treated as a datetime in the local
timezone.
interval_second: datetime timedelta indicating the seconds between two
recurring runs for a periodic schedule.
catchup: Whether the recurring run should catch up if behind schedule.
Expand All @@ -43,17 +52,44 @@ class Schedule(BaseModel):
schedules the latest interval if more than one interval is ready to
be scheduled. Usually, if your pipeline handles backfill
internally, you should turn catchup off to avoid duplicate backfill.
run_once_start_time: datetime object to indicate when to run the
pipeline once. This is useful for one-off runs.
run_once_start_time: When to run the pipeline once. If this is a
datetime object without any timezone, it is treated as a datetime
in the local timezone.
"""

name: Optional[str] = None
cron_expression: Optional[str] = None
start_time: Optional[datetime.datetime] = None
end_time: Optional[datetime.datetime] = None
interval_second: Optional[datetime.timedelta] = None
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
interval_second: Optional[timedelta] = None
catchup: bool = False
run_once_start_time: Optional[datetime.datetime] = None
run_once_start_time: Optional[datetime] = None

@field_validator(
"start_time", "end_time", "run_once_start_time", mode="after"
)
@classmethod
def _ensure_timezone(
cls, value: Optional[datetime], info: ValidationInfo
) -> Optional[datetime]:
"""Ensures that all datetimes are timezone aware.

Args:
value: The datetime.
info: The validation info.

Returns:
A timezone aware datetime or None.
"""
if value and value.tzinfo is None:
assert info.field_name
logger.warning(
"Your schedule `%s` is missing a timezone. It will be treated "
"as a datetime in your local timezone."
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.warning(
"Your schedule `%s` is missing a timezone. It will be treated "
"as a datetime in your local timezone."
)
logger.warning(
f"Your schedule `{info.field_name}` is missing a timezone. It will be treated "
"as a datetime in your local timezone."
)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://google.github.io/styleguide/pyguide.html#3101-logging

For logging functions that expect a pattern-string (with %-placeholders) as their first argument: Always call them with a string literal (not an f-string!) as their first argument with pattern-parameters as subsequent arguments.

No :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're sort of following this style guide for our code base, but even more so, the argument of filtering log messages makes a lot of sense I think. Right now we're just logging to stdout, but we've had multiple discussions already and I'm sure at some point we will allow customizing some logging backends where ZenML logs to. So we should just write our logs with these placeholders, to be prepared for when (not if) this is being implemented.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, Michael, but you're still missing the argument, heee heee

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep that I fixed, thanks 🙏

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Your schedule %s is missing a timezone" comment is missing a schedule 😄

value = value.astimezone()

return value

@model_validator(mode="after")
def _ensure_cron_or_periodic_schedule_configured(self) -> "Schedule":
Expand Down Expand Up @@ -98,27 +134,3 @@ def _ensure_cron_or_periodic_schedule_configured(self) -> "Schedule":
"or a run once start time "
"need to be set for a valid schedule."
)

@property
def utc_start_time(self) -> Optional[str]:
"""Optional ISO-formatted string of the UTC start time.

Returns:
Optional ISO-formatted string of the UTC start time.
"""
if not self.start_time:
return None

return self.start_time.astimezone(datetime.timezone.utc).isoformat()

@property
def utc_end_time(self) -> Optional[str]:
"""Optional ISO-formatted string of the UTC end time.

Returns:
Optional ISO-formatted string of the UTC end time.
"""
if not self.end_time:
return None

return self.end_time.astimezone(datetime.timezone.utc).isoformat()
7 changes: 3 additions & 4 deletions src/zenml/event_hub/base_event_hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"""Base class for event hub implementations."""

from abc import ABC, abstractmethod
from datetime import datetime, timedelta, timezone
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Any, Callable, Dict, Optional, Tuple

from zenml import EventSourceResponse
Expand All @@ -28,6 +28,7 @@
TriggerExecutionResponse,
TriggerResponse,
)
from zenml.utils.time_utils import utc_now
from zenml.zen_server.auth import AuthContext
from zenml.zen_server.jwt import JWTToken

Expand Down Expand Up @@ -134,9 +135,7 @@ def trigger_action(
)
expires: Optional[datetime] = None
if trigger.action.auth_window:
expires = datetime.now(timezone.utc) + timedelta(
minutes=trigger.action.auth_window
)
expires = utc_now() + timedelta(minutes=trigger.action.auth_window)
encoded_token = token.encode(expires=expires)
auth_context = AuthContext(
user=trigger.action.service_account,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from zenml.orchestrators.utils import get_orchestrator_run_name
from zenml.stack import StackValidator
from zenml.utils import io_utils
from zenml.utils.time_utils import utc_now

if TYPE_CHECKING:
from zenml.config import ResourceSettings
Expand Down Expand Up @@ -408,8 +409,7 @@ def _translate_schedule(
if schedule:
if schedule.cron_expression:
start_time = schedule.start_time or (
datetime.datetime.now(datetime.timezone.utc)
- datetime.timedelta(7)
utc_now() - datetime.timedelta(7)
)
return {
"schedule": schedule.cron_expression,
Expand All @@ -429,7 +429,6 @@ def _translate_schedule(
"schedule": "@once",
# set a start time in the past and disable catchup so airflow
# runs the dag immediately
"start_date": datetime.datetime.now(datetime.timezone.utc)
- datetime.timedelta(7),
"start_date": utc_now() - datetime.timedelta(7),
"catchup": False,
}
Loading
Loading