Skip to content

Commit

Permalink
Merge branch 'develop' into doc/addbrokenlinkschecker
Browse files Browse the repository at this point in the history
  • Loading branch information
htahir1 authored Jan 24, 2025
2 parents aecd5fd + ed1a2b6 commit 5e2bda6
Show file tree
Hide file tree
Showing 65 changed files with 381 additions and 324 deletions.
4 changes: 2 additions & 2 deletions docs/book/component-guide/orchestrators/sagemaker.md
Original file line number Diff line number Diff line change
Expand Up @@ -382,14 +382,14 @@ from datetime import datetime, timedelta
from zenml import pipeline
from zenml.config.schedule import Schedule

# Using a cron expression (runs daily at 2 AM UTC)
# Using a cron expression (runs every 5 minutes)
@pipeline
def my_scheduled_pipeline():
# Your pipeline steps here
pass

my_scheduled_pipeline.with_options(
schedule=Schedule(cron_expression="0 2 * * *")
schedule=Schedule(cron_expression="0/5 * * * ? *")
)()

# Using an interval (runs every 2 hours)
Expand Down
8 changes: 2 additions & 6 deletions src/zenml/analytics/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
The base functionalities are adapted to work with the ZenML analytics server.
"""

import datetime
import locale
from types import TracebackType
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Type, Union
Expand All @@ -32,6 +31,7 @@
)
from zenml.environment import Environment, get_environment
from zenml.logger import get_logger
from zenml.utils.time_utils import utc_now_tz_aware

if TYPE_CHECKING:
from zenml.analytics.enums import AnalyticsEvent
Expand Down Expand Up @@ -284,11 +284,7 @@ def track(

try:
# Timezone as tzdata
tz = (
datetime.datetime.now(datetime.timezone.utc)
.astimezone()
.tzname()
)
tz = utc_now_tz_aware().astimezone().tzname()
if tz is not None:
properties.update({"timezone": tz})

Expand Down
10 changes: 5 additions & 5 deletions src/zenml/cli/service_connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# permissions and limitations under the License.
"""Service connector CLI commands."""

from datetime import datetime, timezone
from datetime import datetime
from typing import Any, Dict, List, Optional, Union, cast
from uuid import UUID

Expand All @@ -25,7 +25,6 @@
is_sorted_or_filtered,
list_options,
print_page_info,
seconds_to_human_readable,
)
from zenml.client import Client
from zenml.console import console
Expand All @@ -37,6 +36,7 @@
ServiceConnectorResourcesModel,
ServiceConnectorResponse,
)
from zenml.utils.time_utils import seconds_to_human_readable, utc_now


# Service connectors
Expand Down Expand Up @@ -292,7 +292,7 @@ def prompt_expires_at(
default_str = ""
if default is not None:
seconds = int(
(default - datetime.now(timezone.utc)).total_seconds()
(default - utc_now(tz_aware=default)).total_seconds()
)
default_str = (
f" [{str(default)} i.e. in "
Expand All @@ -309,15 +309,15 @@ def prompt_expires_at(

assert expires_at is not None
assert isinstance(expires_at, datetime)
if expires_at < datetime.now(timezone.utc):
if expires_at < utc_now(tz_aware=expires_at):
cli_utils.warning(
"The expiration time must be in the future. Please enter a "
"later date and time."
)
continue

seconds = int(
(expires_at - datetime.now(timezone.utc)).total_seconds()
(expires_at - utc_now(tz_aware=expires_at)).total_seconds()
)

confirm = click.confirm(
Expand Down
4 changes: 2 additions & 2 deletions src/zenml/cli/stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import re
import time
import webbrowser
from datetime import datetime, timezone
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -77,6 +76,7 @@
)
from zenml.utils import requirements_utils
from zenml.utils.dashboard_utils import get_component_url, get_stack_url
from zenml.utils.time_utils import utc_now_tz_aware
from zenml.utils.yaml_utils import read_yaml, write_yaml

if TYPE_CHECKING:
Expand Down Expand Up @@ -1575,7 +1575,7 @@ def deploy(
):
raise click.Abort()

date_start = datetime.now(timezone.utc)
date_start = utc_now_tz_aware()

webbrowser.open(deployment_config.deployment_url)
console.print(
Expand Down
54 changes: 1 addition & 53 deletions src/zenml/cli/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
"""Utility functions for the CLI."""

import contextlib
import datetime
import json
import os
import platform
Expand Down Expand Up @@ -79,6 +78,7 @@
from zenml.stack import StackComponent
from zenml.stack.stack_component import StackComponentConfig
from zenml.utils import secret_utils
from zenml.utils.time_utils import expires_in

if TYPE_CHECKING:
from uuid import UUID
Expand Down Expand Up @@ -1581,58 +1581,6 @@ def print_components_table(
print_table(configurations)


def seconds_to_human_readable(time_seconds: int) -> str:
"""Converts seconds to human-readable format.
Args:
time_seconds: Seconds to convert.
Returns:
Human readable string.
"""
seconds = time_seconds % 60
minutes = (time_seconds // 60) % 60
hours = (time_seconds // 3600) % 24
days = time_seconds // 86400
tokens = []
if days:
tokens.append(f"{days}d")
if hours:
tokens.append(f"{hours}h")
if minutes:
tokens.append(f"{minutes}m")
if seconds:
tokens.append(f"{seconds}s")

return "".join(tokens)


def expires_in(
expires_at: datetime.datetime,
expired_str: str,
skew_tolerance: Optional[int] = None,
) -> str:
"""Returns a human-readable string of the time until the token expires.
Args:
expires_at: Datetime object of the token expiration.
expired_str: String to return if the token is expired.
skew_tolerance: Seconds of skew tolerance to subtract from the
expiration time. If the token expires within this time, it will be
considered expired.
Returns:
Human readable string.
"""
now = datetime.datetime.now(datetime.timezone.utc)
expires_at = expires_at.replace(tzinfo=datetime.timezone.utc)
if skew_tolerance:
expires_at -= datetime.timedelta(seconds=skew_tolerance)
if expires_at < now:
return expired_str
return seconds_to_human_readable(int((expires_at - now).total_seconds()))


def print_service_connectors_table(
client: "Client",
connectors: Sequence["ServiceConnectorResponse"],
Expand Down
5 changes: 3 additions & 2 deletions src/zenml/config/pipeline_configurations.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# permissions and limitations under the License.
"""Pipeline configuration classes."""

from datetime import datetime, timezone
from datetime import datetime
from typing import TYPE_CHECKING, Any, Dict, List, Optional

from pydantic import SerializeAsAny, field_validator
Expand All @@ -23,6 +23,7 @@
from zenml.config.source import SourceWithValidator
from zenml.config.strict_base_model import StrictBaseModel
from zenml.model.model import Model
from zenml.utils.time_utils import utc_now

if TYPE_CHECKING:
from zenml.config import DockerSettings
Expand Down Expand Up @@ -61,7 +62,7 @@ def _get_full_substitutions(
The full substitutions dict including date and time.
"""
if start_time is None:
start_time = datetime.now(timezone.utc)
start_time = utc_now()
ret = self.substitutions.copy()
ret.setdefault("date", start_time.strftime("%Y_%m_%d"))
ret.setdefault("time", start_time.strftime("%H_%M_%S_%f"))
Expand Down
24 changes: 0 additions & 24 deletions src/zenml/config/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,27 +98,3 @@ def _ensure_cron_or_periodic_schedule_configured(self) -> "Schedule":
"or a run once start time "
"need to be set for a valid schedule."
)

@property
def utc_start_time(self) -> Optional[str]:
"""Optional ISO-formatted string of the UTC start time.
Returns:
Optional ISO-formatted string of the UTC start time.
"""
if not self.start_time:
return None

return self.start_time.astimezone(datetime.timezone.utc).isoformat()

@property
def utc_end_time(self) -> Optional[str]:
"""Optional ISO-formatted string of the UTC end time.
Returns:
Optional ISO-formatted string of the UTC end time.
"""
if not self.end_time:
return None

return self.end_time.astimezone(datetime.timezone.utc).isoformat()
7 changes: 3 additions & 4 deletions src/zenml/event_hub/base_event_hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"""Base class for event hub implementations."""

from abc import ABC, abstractmethod
from datetime import datetime, timedelta, timezone
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Any, Callable, Dict, Optional, Tuple

from zenml import EventSourceResponse
Expand All @@ -28,6 +28,7 @@
TriggerExecutionResponse,
TriggerResponse,
)
from zenml.utils.time_utils import utc_now
from zenml.zen_server.auth import AuthContext
from zenml.zen_server.jwt import JWTToken

Expand Down Expand Up @@ -134,9 +135,7 @@ def trigger_action(
)
expires: Optional[datetime] = None
if trigger.action.auth_window:
expires = datetime.now(timezone.utc) + timedelta(
minutes=trigger.action.auth_window
)
expires = utc_now() + timedelta(minutes=trigger.action.auth_window)
encoded_token = token.encode(expires=expires)
auth_context = AuthContext(
user=trigger.action.service_account,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from zenml.orchestrators.utils import get_orchestrator_run_name
from zenml.stack import StackValidator
from zenml.utils import io_utils
from zenml.utils.time_utils import utc_now

if TYPE_CHECKING:
from zenml.config import ResourceSettings
Expand Down Expand Up @@ -408,8 +409,7 @@ def _translate_schedule(
if schedule:
if schedule.cron_expression:
start_time = schedule.start_time or (
datetime.datetime.now(datetime.timezone.utc)
- datetime.timedelta(7)
utc_now() - datetime.timedelta(7)
)
return {
"schedule": schedule.cron_expression,
Expand All @@ -429,7 +429,6 @@ def _translate_schedule(
"schedule": "@once",
# set a start time in the past and disable catchup so airflow
# runs the dag immediately
"start_date": datetime.datetime.now(datetime.timezone.utc)
- datetime.timedelta(7),
"start_date": utc_now() - datetime.timedelta(7),
"catchup": False,
}
20 changes: 11 additions & 9 deletions src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import os
import re
from datetime import datetime, timezone
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -64,6 +63,7 @@
from zenml.orchestrators.utils import get_orchestrator_run_name
from zenml.stack import StackValidator
from zenml.utils.env_utils import split_environment_variables
from zenml.utils.time_utils import to_utc_timezone, utc_now_tz_aware

if TYPE_CHECKING:
from zenml.models import PipelineDeploymentResponse, PipelineRunResponse
Expand Down Expand Up @@ -522,6 +522,11 @@ def prepare_or_run_pipeline(

schedule_name = orchestrator_run_name
next_execution = None
start_date = (
to_utc_timezone(deployment.schedule.start_time)
if deployment.schedule.start_time
else None
)

# Create PipelineSchedule based on schedule type
if deployment.schedule.cron_expression:
Expand All @@ -531,7 +536,7 @@ def prepare_or_run_pipeline(
schedule = PipelineSchedule(
name=schedule_name,
cron=cron_exp,
start_date=deployment.schedule.start_time,
start_date=start_date,
enabled=True,
)
elif deployment.schedule.interval_second:
Expand All @@ -549,12 +554,11 @@ def prepare_or_run_pipeline(
schedule = PipelineSchedule(
name=schedule_name,
rate=(minutes, "minutes"),
start_date=deployment.schedule.start_time,
start_date=start_date,
enabled=True,
)
next_execution = (
deployment.schedule.start_time
or datetime.now(timezone.utc)
deployment.schedule.start_time or utc_now_tz_aware()
) + deployment.schedule.interval_second
else:
# One-time schedule
Expand All @@ -569,7 +573,7 @@ def prepare_or_run_pipeline(
)
schedule = PipelineSchedule(
name=schedule_name,
at=execution_time.astimezone(timezone.utc),
at=to_utc_timezone(execution_time),
enabled=True,
)
next_execution = execution_time
Expand Down Expand Up @@ -722,15 +726,13 @@ def get_pipeline_run_metadata(
Returns:
A dictionary of metadata.
"""
from zenml import get_step_context

execution_arn = os.environ[ENV_ZENML_SAGEMAKER_RUN_ID]

run_metadata: Dict[str, "MetadataType"] = {}

settings = cast(
SagemakerOrchestratorSettings,
self.get_settings(get_step_context().pipeline_run),
self.get_settings(Client().get_pipeline_run(run_id)),
)

for metadata in self.compute_metadata(
Expand Down
Loading

0 comments on commit 5e2bda6

Please sign in to comment.