Skip to content

Commit

Permalink
Migrates notification block (#7881)
Browse files Browse the repository at this point in the history
Co-authored-by: Alexander Streed <[email protected]>
  • Loading branch information
ahuang11 and desertaxle authored Dec 15, 2022
1 parent c5122a5 commit 3aa447a
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 20 deletions.
32 changes: 29 additions & 3 deletions src/prefect/blocks/abstract.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,44 @@
from abc import ABC, abstractmethod
from logging import Logger
from pathlib import Path
from typing import Any, BinaryIO, Dict, Generic, List, Tuple, TypeVar, Union
from typing import Any, BinaryIO, Dict, Generic, List, Optional, Tuple, TypeVar, Union

from typing_extensions import Self

from prefect import get_run_logger
from prefect.blocks.core import Block
from prefect.exceptions import MissingContextError
from prefect.logging.loggers import get_logger
from prefect.logging.loggers import get_logger, get_run_logger

T = TypeVar("T")


class NotificationBlock(Block, ABC):
"""
Block that represents a resource in an external system that is able to send notifications.
"""

_block_schema_capabilities = ["notify"]

@property
def logger(self):
"""
Returns a logger based on whether the JobRun
is called from within a flow or task run context.
If a run context is present, the logger property returns a run logger.
Else, it returns a default logger labeled with the class's name.
"""
try:
return get_run_logger()
except MissingContextError:
return get_logger(self.__class__.__name__)

@abstractmethod
async def notify(self, body: str, subject: Optional[str] = None) -> None:
"""
Send a notification.
"""


class JobRun(ABC, Generic[T]): # not a block
"""
Represents a job run in an external system. Allows waiting
Expand Down
18 changes: 2 additions & 16 deletions src/prefect/blocks/notifications.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from abc import ABC, abstractmethod
from abc import ABC
from typing import Dict, List, Optional

import apprise
Expand All @@ -9,24 +9,10 @@
from pydantic import AnyHttpUrl, Field, SecretStr
from typing_extensions import Literal

from prefect.blocks.core import Block
from prefect.blocks.abstract import NotificationBlock
from prefect.utilities.asyncutils import sync_compatible


class NotificationBlock(Block, ABC):
"""
A `Block` base class for sending notifications.
"""

_block_schema_capabilities = ["notify"]

@abstractmethod
async def notify(self, body: str, subject: Optional[str] = None):
"""
Send a notification
"""


class PrefectNotifyType(NotifyType):
"""
A mapping of Prefect notification types for use with Apprise.
Expand Down
31 changes: 30 additions & 1 deletion tests/blocks/test_abstract.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,38 @@
import pytest

from prefect.blocks.abstract import DatabaseBlock, JobBlock, JobRun, ObjectStorageBlock
from prefect.blocks.abstract import (
DatabaseBlock,
JobBlock,
JobRun,
NotificationBlock,
ObjectStorageBlock,
)
from prefect.exceptions import PrefectException


class TestNotificationBlock:
def test_notification_block_is_abstract(self):
with pytest.raises(
TypeError, match="Can't instantiate abstract class NotificationBlock"
):
NotificationBlock()

def test_notification_block_implementation(self, caplog):
class ANotificationBlock(NotificationBlock):
def notify(self, body, subject=None):
self.logger.info(f"Notification sent with {body} {subject}.")

a_notification_block = ANotificationBlock()
a_notification_block.notify("body", "subject")

# test logging
assert hasattr(a_notification_block, "logger")
assert len(caplog.records) == 1
record = caplog.records[0]
assert record.name == "prefect.ANotificationBlock"
assert record.msg == "Notification sent with body subject."


class JobRunIsRunning(PrefectException):
"""Raised when a job run is still running."""

Expand Down

0 comments on commit 3aa447a

Please sign in to comment.