diff --git a/src/prefect/blocks/abstract.py b/src/prefect/blocks/abstract.py index 1cf1c077d44d..83df38300aa6 100644 --- a/src/prefect/blocks/abstract.py +++ b/src/prefect/blocks/abstract.py @@ -1,18 +1,44 @@ from abc import ABC, abstractmethod from logging import Logger from pathlib import Path -from typing import Any, BinaryIO, Dict, Generic, List, Tuple, TypeVar, Union +from typing import Any, BinaryIO, Dict, Generic, List, Optional, Tuple, TypeVar, Union from typing_extensions import Self -from prefect import get_run_logger from prefect.blocks.core import Block from prefect.exceptions import MissingContextError -from prefect.logging.loggers import get_logger +from prefect.logging.loggers import get_logger, get_run_logger T = TypeVar("T") +class NotificationBlock(Block, ABC): + """ + Block that represents a resource in an external system that is able to send notifications. + """ + + _block_schema_capabilities = ["notify"] + + @property + def logger(self): + """ + Returns a logger based on whether the JobRun + is called from within a flow or task run context. + If a run context is present, the logger property returns a run logger. + Else, it returns a default logger labeled with the class's name. + """ + try: + return get_run_logger() + except MissingContextError: + return get_logger(self.__class__.__name__) + + @abstractmethod + async def notify(self, body: str, subject: Optional[str] = None) -> None: + """ + Send a notification. + """ + + class JobRun(ABC, Generic[T]): # not a block """ Represents a job run in an external system. Allows waiting diff --git a/src/prefect/blocks/notifications.py b/src/prefect/blocks/notifications.py index 1819801d550b..a86c2cee8c6a 100644 --- a/src/prefect/blocks/notifications.py +++ b/src/prefect/blocks/notifications.py @@ -1,4 +1,4 @@ -from abc import ABC, abstractmethod +from abc import ABC from typing import Dict, List, Optional import apprise @@ -9,24 +9,10 @@ from pydantic import AnyHttpUrl, Field, SecretStr from typing_extensions import Literal -from prefect.blocks.core import Block +from prefect.blocks.abstract import NotificationBlock from prefect.utilities.asyncutils import sync_compatible -class NotificationBlock(Block, ABC): - """ - A `Block` base class for sending notifications. - """ - - _block_schema_capabilities = ["notify"] - - @abstractmethod - async def notify(self, body: str, subject: Optional[str] = None): - """ - Send a notification - """ - - class PrefectNotifyType(NotifyType): """ A mapping of Prefect notification types for use with Apprise. diff --git a/tests/blocks/test_abstract.py b/tests/blocks/test_abstract.py index 81db614ed7cc..d92e541b9869 100644 --- a/tests/blocks/test_abstract.py +++ b/tests/blocks/test_abstract.py @@ -1,9 +1,38 @@ import pytest -from prefect.blocks.abstract import DatabaseBlock, JobBlock, JobRun, ObjectStorageBlock +from prefect.blocks.abstract import ( + DatabaseBlock, + JobBlock, + JobRun, + NotificationBlock, + ObjectStorageBlock, +) from prefect.exceptions import PrefectException +class TestNotificationBlock: + def test_notification_block_is_abstract(self): + with pytest.raises( + TypeError, match="Can't instantiate abstract class NotificationBlock" + ): + NotificationBlock() + + def test_notification_block_implementation(self, caplog): + class ANotificationBlock(NotificationBlock): + def notify(self, body, subject=None): + self.logger.info(f"Notification sent with {body} {subject}.") + + a_notification_block = ANotificationBlock() + a_notification_block.notify("body", "subject") + + # test logging + assert hasattr(a_notification_block, "logger") + assert len(caplog.records) == 1 + record = caplog.records[0] + assert record.name == "prefect.ANotificationBlock" + assert record.msg == "Notification sent with body subject." + + class JobRunIsRunning(PrefectException): """Raised when a job run is still running."""