Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AWS Neptune: Operators for StartDB and StopDB cluster #29168

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 113 additions & 0 deletions airflow/providers/amazon/aws/hooks/neptune.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Interact with AWS Neptune."""
from __future__ import annotations

import time
from typing import Callable

from airflow.exceptions import AirflowException, AirflowNotFoundException
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook


class NeptuneHook(AwsBaseHook):
"""
Interact with AWS Neptune using proper client from the boto3 library.

Hook attribute `conn` has all methods that listed in documentation

.. seealso::
- https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/neptune.html
- https://docs.aws.amazon.com/neptune/index.html

Additional arguments (such as ``aws_conn_id`` or ``region_name``) may be specified and
are passed down to the underlying AwsBaseHook.

.. seealso::
:class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsGenericHook`

:param aws_conn_id: The Airflow connection used for AWS credentials.
"""
Comment on lines +29 to +46
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please define Hook docsting by the same way it is implemented in other boto3-hooks. See example

class EcsHook(AwsGenericHook):
"""
Interact with Amazon Elastic Container Service (ECS).
Provide thin wrapper around :external+boto3:py:class:`boto3.client("ecs") <ECS.Client>`.
Additional arguments (such as ``aws_conn_id``) may be specified and
are passed down to the underlying AwsBaseHook.
.. seealso::
- :class:`airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
- `Amazon Elastic Container Service \
<https://docs.aws.amazon.com/AmazonECS/latest/APIReference/Welcome.html>`__
"""

You could check in main branch documentation how it looks like.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can i check how my current doc string looks like in web UI ? Any chance ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could build it in Breeze, be aware build entire documentation required a lot of time, so make sure you only build for Amazon

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The command for that is breeze build-docs --package-filter apache-airflow-providers-amazon and it does save a lot of build time. 👍


def __init__(self, *args, **kwargs) -> None:
kwargs["client_type"] = "neptune"
super().__init__(*args, **kwargs)

def get_db_cluster_state(self, db_cluster_id: str) -> str:
"""
Get the current state of a DB cluster.

:param db_cluster_id: The ID of the target DB cluster.
:return: Returns the status of the DB cluster as a string (eg. "available")
:rtype: str
:raises AirflowNotFoundException: If the DB cluster does not exist.
"""
try:
response = self.conn.describe_db_clusters(DBClusterIdentifier=db_cluster_id)
except self.conn.exceptions.ClientError as e:
if e.response["Error"]["Code"] == "DBClusterNotFoundFault":
raise AirflowNotFoundException(e)
raise e
return response["DBClusters"][0]["Status"].lower()

def wait_for_db_cluster_state(
self, db_cluster_id: str, target_state: str, check_interval: int = 30, max_attempts: int = 40
) -> None:
"""
Polls until the target state is reached.
An error is raised after a max number of attempts.

:param db_cluster_id: The ID of the target DB cluster.
:param target_state: Wait until this state is reached
:param check_interval: The amount of time in seconds to wait between attempts
:param max_attempts: The maximum number of attempts to be made

"""

def poke():
return self.get_db_cluster_state(db_cluster_id)

target_state = target_state.lower()
self._wait_for_state(poke, target_state, check_interval, max_attempts)
self.log.info("DB cluster snapshot '%s' reached the '%s' state", db_cluster_id, target_state)
Comment on lines +83 to +88
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thinks currently we have a different method for waiting operations in new hooks?

@vincbeck @ferruzzi @vandonr-amz Am I right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct! Please use the function waiter defined here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry to contradict Vincent, but we should be standardizing on this new Waiter implementation which offloads a lot of the work to the boto API instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch! No worries at all, it is good to be contradicted :) I forgot we implemented this. Side question, should we then deprecate the waiter function or is there any use case not covered by the custom waiters which the waiter function satisfy?

Copy link
Contributor

@ferruzzi ferruzzi Jan 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @syedahsn and I were working on them in parallel without noticing it, but whether there are usecases where his (the one you linked first) is the better answer... I don't think there are, but I could be mistaken.

And yeah, I guess we should flag one as deprecated, or at least leave a comment to that effect so folks don't add to the mess, and set some time aside to do the conversions. Batch is another that has it's own unique way of re-implementing the boto waiter and needs to get moved over to a standardized approach at some point.

I think it would be scope creep to have any of that done here, but the new waiters should definitely be done the "right" way at the very least.


def _wait_for_state(
self,
poke: Callable[..., str],
target_state: str,
check_interval: int,
max_attempts: int,
) -> None:
"""
Polls the poke function for the current state until it reaches the target_state.

:param poke: A function that returns the current state of the target resource as a string.
:param target_state: Wait until this state is reached
:param check_interval: The amount of time in seconds to wait between attempts
:param max_attempts: The maximum number of attempts to be made
"""
state = poke()
tries = 1
while state != target_state:
self.log.info("Current state is %s", state)
if tries >= max_attempts:
raise AirflowException("Max attempts exceeded")
time.sleep(check_interval)
state = poke()
tries += 1
152 changes: 152 additions & 0 deletions airflow/providers/amazon/aws/operators/neptune.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import json
from typing import TYPE_CHECKING

from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.neptune import NeptuneHook
from airflow.providers.amazon.aws.utils.neptune import NeptuneDbType

if TYPE_CHECKING:
from airflow.utils.context import Context


class NeptuneStartDbOperator(BaseOperator):
"""
Starts a Neptune DB cluster

.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:NeptuneStartDbOperator`

:param db_identifier: The AWS identifier of the DB to start
:param db_type: Type of the DB - either "instance" or "cluster" (default: "cluster")
:param aws_conn_id: The Airflow connection used for AWS credentials. (default: "aws_default")
:param wait_for_completion: If True, waits for DB to start. (default: True)

Note: In boto3 supports starting db operator only for cluster and not for instance db_type.
So, default is maintained as Cluster, however it can be extended once instance db_type is available,
similar to RDS database implementation
"""

template_fields = ("db_identifier", "db_type")
STATES_FOR_STARTING = ["available", "starting"]

def __init__(
self,
*,
db_identifier: str,
db_type: NeptuneDbType | str = NeptuneDbType.CLUSTER,
aws_conn_id: str = "aws_default",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aws_conn_id could be None in this case default boto3 strategy would use.

Suggested change
aws_conn_id: str = "aws_default",
aws_conn_id: str | None = "aws_default",

region_name: str = "us-east-1",
Copy link
Contributor

@Taragolis Taragolis Jan 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not define default region_name

Suggested change
region_name: str = "us-east-1",
region_name: str | None = None,

wait_for_completion: bool = True,
**kwargs,
):
super().__init__(**kwargs)
self.db_identifier = db_identifier
self.hook = NeptuneHook(aws_conn_id=aws_conn_id, region_name=region_name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should move hook definition to @cached_property, e.g.:

@cached_property
def hook(self) -> AthenaHook:
"""Create and return an AthenaHook."""
return AthenaHook(self.aws_conn_id, sleep_time=self.sleep_time, log_query=self.log_query)

self.db_identifier = db_identifier
self.db_type = db_type
self.aws_conn_id = aws_conn_id
self.wait_for_completion = wait_for_completion

def execute(self, context: Context) -> str:
self.db_type = NeptuneDbType(self.db_type)
Copy link
Contributor

@vincbeck vincbeck Jan 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're overriding the value you already set on line 66? I am not sure I understand what you are trying to achieve here

start_db_response = None
if (
self.hook.get_db_cluster_state(self.db_identifier)
not in NeptuneStartDbOperator.STATES_FOR_STARTING
):
self._start_db()

if self.wait_for_completion:
self._wait_until_db_available()
return json.dumps(start_db_response, default=str)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

start_db_response is always None?


def _start_db(self):
self.log.info("Starting DB %s '%s'", self.db_type.value, self.db_identifier)
self.hook.conn.start_db_cluster(DBClusterIdentifier=self.db_identifier)

def _wait_until_db_available(self):
self.log.info("Waiting for DB %s to reach 'available' state", self.db_type.value)
self.hook.wait_for_db_cluster_state(self.db_identifier, target_state="available")


class NeptuneStopDbOperator(BaseOperator):
"""
Stops a Neptune DB cluster

.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:NeptuneStopDbOperator`

:param db_identifier: The AWS identifier of the DB to start
:param db_type: Type of the DB - either "instance" or "cluster" (default: "cluster")
:param aws_conn_id: The Airflow connection used for AWS credentials. (default: "aws_default")
:param wait_for_completion: If True, waits for DB to start. (default: True)

Note: In boto3 supports starting db operator only for cluster and not for instance db_type.
So, default is maintained as Cluster, however it can be extended once instance db_type is available,
similar to RDS database implementation
"""

template_fields = ("db_identifier", "db_type")
STATES_FOR_STOPPING = ["stopped", "stopping"]

def __init__(
self,
*,
db_identifier: str,
db_type: NeptuneDbType | str = NeptuneDbType.INSTANCE,
aws_conn_id: str = "aws_default",
region_name: str = "us-east-1",
Comment on lines +118 to +119
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

wait_for_completion: bool = True,
**kwargs,
):
super().__init__(**kwargs)
self.hook = NeptuneHook(aws_conn_id=aws_conn_id, region_name=region_name)
self.db_identifier = db_identifier
self.db_type = db_type
self.aws_conn_id = aws_conn_id
self.wait_for_completion = wait_for_completion

def execute(self, context: Context) -> str:
self.db_type = NeptuneDbType(self.db_type)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

stop_db_response = None
if (
self.hook.get_db_cluster_state(self.db_identifier)
not in NeptuneStopDbOperator.STATES_FOR_STOPPING
):
stop_db_response = self._stop_db()
if self.wait_for_completion:
self._wait_until_db_stopped()
return json.dumps(stop_db_response, default=str)

def _stop_db(self):
self.log.info("Stopping DB %s '%s'", self.db_type.value, self.db_identifier)
response = self.hook.conn.stop_db_cluster(DBClusterIdentifier=self.db_identifier)
return response
Comment on lines +144 to +145
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
response = self.hook.conn.stop_db_cluster(DBClusterIdentifier=self.db_identifier)
return response
return self.hook.conn.stop_db_cluster(DBClusterIdentifier=self.db_identifier)


def _wait_until_db_stopped(self):
self.log.info("Waiting for DB %s to reach 'stopped' state", self.db_type.value)
self.hook.wait_for_db_cluster_state(self.db_identifier, target_state="stopped")


__all__ = ["NeptuneStartDbOperator", "NeptuneStopDbOperator"]
Comment on lines +151 to +152
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to include __all__ include it in the top of the module

26 changes: 26 additions & 0 deletions airflow/providers/amazon/aws/utils/neptune.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from enum import Enum


class NeptuneDbType(Enum):
"""Only available types for the Neptune"""

INSTANCE: str = "instance"
CLUSTER: str = "cluster"
12 changes: 12 additions & 0 deletions airflow/providers/amazon/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,12 @@ integrations:
external-doc-url: https://aws.amazon.com/kinesis/data-firehose/
logo: /integration-logos/aws/[email protected]
tags: [aws]
- integration-name: Amazon Neptune
external-doc-url: https://aws.amazon.com/neptune/
logo: /integration-logos/aws/[email protected]
how-to-guide:
- /docs/apache-airflow-providers-amazon/operators/neptune.rst
tags: [aws]
- integration-name: Amazon RDS
external-doc-url: https://aws.amazon.com/rds/
logo: /integration-logos/aws/[email protected]
Expand Down Expand Up @@ -297,6 +303,9 @@ operators:
python-modules:
- airflow.providers.amazon.aws.operators.aws_lambda
- airflow.providers.amazon.aws.operators.lambda_function
- integration-name: Amazon Neptune
python-modules:
- airflow.providers.amazon.aws.operators.neptune
- integration-name: Amazon Simple Storage Service (S3)
python-modules:
- airflow.providers.amazon.aws.operators.s3
Expand Down Expand Up @@ -449,6 +458,9 @@ hooks:
- integration-name: Amazon CloudWatch Logs
python-modules:
- airflow.providers.amazon.aws.hooks.logs
- integration-name: Amazon Neptune
python-modules:
- airflow.providers.amazon.aws.hooks.neptune
- integration-name: Amazon RDS
python-modules:
- airflow.providers.amazon.aws.hooks.rds
Expand Down
70 changes: 70 additions & 0 deletions docs/apache-airflow-providers-amazon/operators/neptune.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.

======================================================
Amazon Neptune Documentation
======================================================
Comment on lines +18 to +20
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please be sure lines of === are the same length as the title


`Amazon Neptune is a fast, reliable, fully managed graph database service that makes it easy to build and run
applications that work with highly connected datasets. The core of Neptune is a purpose-built,
high-performance graph database engine that is optimized for storing billions of relationships and
querying the graph with milliseconds latency. Neptune supports the popular graph query languages
Apache TinkerPop Gremlin and W3C's SPARQL, allowing you to build queries that efficiently navigate highly connected
datasets. Neptune powers graph use cases such as recommendation engines, fraud detection, knowledge graphs,
drug discovery, and network security.`
Comment on lines +22 to +28
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
`Amazon Neptune is a fast, reliable, fully managed graph database service that makes it easy to build and run
applications that work with highly connected datasets. The core of Neptune is a purpose-built,
high-performance graph database engine that is optimized for storing billions of relationships and
querying the graph with milliseconds latency. Neptune supports the popular graph query languages
Apache TinkerPop Gremlin and W3C's SPARQL, allowing you to build queries that efficiently navigate highly connected
datasets. Neptune powers graph use cases such as recommendation engines, fraud detection, knowledge graphs,
drug discovery, and network security.`
Amazon Neptune is a fast, reliable, fully managed graph database service that makes it easy to build and run
applications that work with highly connected datasets. The core of Neptune is a purpose-built,
high-performance graph database engine that is optimized for storing billions of relationships and
querying the graph with milliseconds latency. Neptune supports the popular graph query languages
Apache TinkerPop Gremlin and W3C's SPARQL, allowing you to build queries that efficiently navigate highly connected
datasets. Neptune powers graph use cases such as recommendation engines, fraud detection, knowledge graphs,
drug discovery, and network security.


Prerequisite Tasks
------------------

.. include:: _partials/prerequisite_tasks.rst

Operators
---------

.. _howto/operator:NeptuneStartDbOperator:

Start a database cluster
====================================

To start an Amazon Neptune DB cluster you can use
:class:`~airflow.providers.amazon.aws.operators.neptune.NeptuneStartDbOperator`.

.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_neptune_cluster.py
:language: python
:dedent: 4
:start-after: [START howto_operator_neptune_start_db]
:end-before: [END howto_operator_neptune_start_db]


.. _howto/operator:NeptuneStopDbOperator:

Stop a database cluster
===================================

To stop an Amazon Neptune DB cluster you can use
:class:`~airflow.providers.amazon.aws.operators.neptune.NeptuneStopDbOperator`.

.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_neptune_cluster.py
:language: python
:dedent: 4
:start-after: [START howto_operator_neptune_stop_db]
:end-before: [END howto_operator_neptune_stop_db]

Reference
---------

* `AWS boto3 library documentation for Neptune <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/neptune.html>`__
Binary file added docs/integration-logos/aws/[email protected]
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading