Skip to content

Commit

Permalink
fix ruff format issues
Browse files Browse the repository at this point in the history
  • Loading branch information
gopidesupavan committed Jul 19, 2024
1 parent e7e8325 commit 926efcb
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 85 deletions.
28 changes: 28 additions & 0 deletions airflow/providers/amazon/aws/hooks/kinesis_analytics.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,34 @@ class KinesisAnalyticsV2Hook(AwsBaseHook):
- :class:`airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
"""

APPLICATION_START_INTERMEDIATE_STATES: tuple[str, ...] = ("STARTING", "UPDATING", "AUTOSCALING")
APPLICATION_START_FAILURE_STATES: tuple[str, ...] = (
"DELETING",
"STOPPING",
"READY",
"FORCE_STOPPING",
"ROLLING_BACK",
"MAINTENANCE",
"ROLLED_BACK",
)
APPLICATION_START_SUCCESS_STATES: tuple[str, ...] = ("RUNNING",)

APPLICATION_STOP_INTERMEDIATE_STATES: tuple[str, ...] = (
"STARTING",
"UPDATING",
"AUTOSCALING",
"RUNNING",
"STOPPING",
"FORCE_STOPPING",
)
APPLICATION_STOP_FAILURE_STATES: tuple[str, ...] = (
"DELETING",
"ROLLING_BACK",
"MAINTENANCE",
"ROLLED_BACK",
)
APPLICATION_STOP_SUCCESS_STATES: tuple[str, ...] = ("READY",)

def __init__(self, *args, **kwargs) -> None:
kwargs["client_type"] = "kinesisanalyticsv2"
super().__init__(*args, **kwargs)
162 changes: 77 additions & 85 deletions airflow/providers/amazon/aws/sensors/kinesis_analytics.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,72 @@
from airflow.utils.context import Context


class KinesisAnalyticsV2StartApplicationCompletedSensor(AwsBaseSensor[KinesisAnalyticsV2Hook]):
class KinesisAnalyticsV2BaseSensor(AwsBaseSensor[KinesisAnalyticsV2Hook]):
"""
General sensor behaviour for AWS Managed Service for Apache Flink.
Subclasses must set the following fields:
- ``INTERMEDIATE_STATES``
- ``FAILURE_STATES``
- ``SUCCESS_STATES``
- ``FAILURE_MESSAGE``
- ``SUCCESS_MESSAGE``
:param application_name: Application name.
:param deferrable: If True, the sensor will operate in deferrable mode. This mode requires aiobotocore
module to be installed.
(default: False, but can be overridden in config file by setting default_deferrable to True)
"""

aws_hook_class = KinesisAnalyticsV2Hook
ui_color = "#66c3ff"

INTERMEDIATE_STATES: tuple[str, ...] = ()
FAILURE_STATES: tuple[str, ...] = ()
SUCCESS_STATES: tuple[str, ...] = ()
FAILURE_MESSAGE = ""
SUCCESS_MESSAGE = ""

def __init__(
self,
application_name: str,
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
**kwargs: Any,
):
super().__init__(**kwargs)
self.application_name = application_name
self.deferrable = deferrable

def poke(self, context: Context, **kwargs) -> bool:
status = self.hook.conn.describe_application(ApplicationName=self.application_name)[
"ApplicationDetail"
]["ApplicationStatus"]

self.log.info(
"Poking for AWS Managed Service for Apache Flink application: %s status: %s",
self.application_name,
status,
)

if status in self.FAILURE_STATES:
# TODO: remove this if block when min_airflow_version is set to higher than 2.7.1
if self.soft_fail:
raise AirflowSkipException(self.FAILURE_MESSAGE)
raise AirflowException(self.FAILURE_MESSAGE)

if status in self.SUCCESS_STATES:
self.log.info(
"%s `%s`.",
self.SUCCESS_MESSAGE,
self.application_name,
)
return True

return False


class KinesisAnalyticsV2StartApplicationCompletedSensor(KinesisAnalyticsV2BaseSensor):
"""
Waits for AWS Managed Service for Apache Flink application to start.
Expand Down Expand Up @@ -59,21 +124,12 @@ class KinesisAnalyticsV2StartApplicationCompletedSensor(AwsBaseSensor[KinesisAna
"""

aws_hook_class = KinesisAnalyticsV2Hook
ui_color = "#66c3ff"
INTERMEDIATE_STATES: tuple[str, ...] = KinesisAnalyticsV2Hook.APPLICATION_START_INTERMEDIATE_STATES
FAILURE_STATES: tuple[str, ...] = KinesisAnalyticsV2Hook.APPLICATION_START_FAILURE_STATES
SUCCESS_STATES: tuple[str, ...] = KinesisAnalyticsV2Hook.APPLICATION_START_SUCCESS_STATES

INTERMEDIATE_STATES: tuple[str, ...] = ("STARTING", "UPDATING", "AUTOSCALING")
FAILURE_STATES: tuple[str, ...] = (
"DELETING",
"STOPPING",
"READY",
"FORCE_STOPPING",
"ROLLING_BACK",
"MAINTENANCE",
"ROLLED_BACK",
)
SUCCESS_STATES: tuple[str, ...] = ("RUNNING",)
FAILURE_MESSAGE = "AWS Managed Service for Apache Flink application start failed."
SUCCESS_MESSAGE = "AWS Managed Service for Apache Flink application started successfully"

template_fields: Sequence[str] = aws_template_fields("application_name")

Expand All @@ -83,14 +139,12 @@ def __init__(
application_name: str,
max_retries: int = 75,
poke_interval: int = 120,
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
**kwargs: Any,
) -> None:
super().__init__(**kwargs)
super().__init__(application_name=application_name, **kwargs)
self.application_name = application_name
self.max_retries = max_retries
self.poke_interval = poke_interval
self.deferrable = deferrable

def execute(self, context: Context) -> Any:
if self.deferrable:
Expand All @@ -110,34 +164,8 @@ def execute(self, context: Context) -> Any:
else:
super().execute(context=context)

def poke(self, context: Context, **kwargs) -> bool:
status = self.hook.conn.describe_application(ApplicationName=self.application_name)[
"ApplicationDetail"
]["ApplicationStatus"]

self.log.info(
"Poking for AWS Managed Service for Apache Flink application: %s status: %s",
self.application_name,
status,
)

if status in self.FAILURE_STATES:
# TODO: remove this if block when min_airflow_version is set to higher than 2.7.1
if self.soft_fail:
raise AirflowSkipException(self.FAILURE_MESSAGE)
raise AirflowException(self.FAILURE_MESSAGE)

if status in self.SUCCESS_STATES:
self.log.info(
"AWS Managed Service for Apache Flink application started successfully `%s`.",
self.application_name,
)
return True

return False


class KinesisAnalyticsV2StopApplicationCompletedSensor(AwsBaseSensor[KinesisAnalyticsV2Hook]):
class KinesisAnalyticsV2StopApplicationCompletedSensor(KinesisAnalyticsV2BaseSensor):
"""
Waits for AWS Managed Service for Apache Flink application to stop.
Expand Down Expand Up @@ -165,20 +193,12 @@ class KinesisAnalyticsV2StopApplicationCompletedSensor(AwsBaseSensor[KinesisAnal
"""

aws_hook_class = KinesisAnalyticsV2Hook
ui_color = "#66c3ff"
INTERMEDIATE_STATES: tuple[str, ...] = KinesisAnalyticsV2Hook.APPLICATION_STOP_INTERMEDIATE_STATES
FAILURE_STATES: tuple[str, ...] = KinesisAnalyticsV2Hook.APPLICATION_STOP_FAILURE_STATES
SUCCESS_STATES: tuple[str, ...] = KinesisAnalyticsV2Hook.APPLICATION_STOP_SUCCESS_STATES

INTERMEDIATE_STATES: tuple[str, ...] = (
"STARTING",
"UPDATING",
"AUTOSCALING",
"RUNNING",
"STOPPING",
"FORCE_STOPPING",
)
FAILURE_STATES: tuple[str, ...] = ("DELETING", "ROLLING_BACK", "MAINTENANCE", "ROLLED_BACK")
SUCCESS_STATES: tuple[str, ...] = ("READY",)
FAILURE_MESSAGE = "AWS Managed Service for Apache Flink application stop failed."
SUCCESS_MESSAGE = "AWS Managed Service for Apache Flink application stopped successfully"

template_fields: Sequence[str] = aws_template_fields("application_name")

Expand All @@ -188,14 +208,12 @@ def __init__(
application_name: str,
max_retries: int = 75,
poke_interval: int = 120,
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
**kwargs: Any,
) -> None:
super().__init__(**kwargs)
super().__init__(application_name=application_name, **kwargs)
self.application_name = application_name
self.max_retries = max_retries
self.poke_interval = poke_interval
self.deferrable = deferrable

def execute(self, context: Context) -> Any:
if self.deferrable:
Expand All @@ -214,29 +232,3 @@ def execute(self, context: Context) -> Any:
)
else:
super().execute(context=context)

def poke(self, context: Context, **kwargs) -> bool:
status = self.hook.conn.describe_application(ApplicationName=self.application_name)[
"ApplicationDetail"
]["ApplicationStatus"]

self.log.info(
"Poking for AWS Managed Service for Apache Flink application: %s status: %s",
self.application_name,
status,
)

if status in self.FAILURE_STATES:
# TODO: remove this if block when min_airflow_version is set to higher than 2.7.1
if self.soft_fail:
raise AirflowSkipException(self.FAILURE_MESSAGE)
raise AirflowException(self.FAILURE_MESSAGE)

if status in self.SUCCESS_STATES:
self.log.info(
"AWS Managed Service for Apache Flink application stopped successfully `%s`.",
self.application_name,
)
return True

return False
1 change: 1 addition & 0 deletions tests/always/test_project_structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,7 @@ class TestAmazonProviderProjectStructure(ExampleCoverageTest):
"airflow.providers.amazon.aws.transfers.base.AwsToAwsBaseOperator",
"airflow.providers.amazon.aws.operators.comprehend.ComprehendBaseOperator",
"airflow.providers.amazon.aws.sensors.comprehend.ComprehendBaseSensor",
"airflow.providers.amazon.aws.sensors.kinesis_analytics.KinesisAnalyticsV2BaseSensor",
}

MISSING_EXAMPLES_FOR_CLASSES = {
Expand Down

0 comments on commit 926efcb

Please sign in to comment.