Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AWS Neptune: Operators for StartDB and StopDB cluster #29168

Closed
wants to merge 2 commits into from

Conversation

swapz-z
Copy link

@swapz-z swapz-z commented Jan 25, 2023

closes: #28289

Introducing a Hook for AWS Neptune Database along with Start and Stop DB operators for Neptune Cluster

NeptuneHook
NeptuneStartDbOperator
NeptuneStopDbOperator

The current implementation has been hugely inspired from RDS hooks and operators.


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

db_identifier: str,
db_type: NeptuneDbType | str = NeptuneDbType.CLUSTER,
aws_conn_id: str = "aws_default",
region_name: str = "us-east-1",
Copy link
Contributor

@Taragolis Taragolis Jan 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not define default region_name

Suggested change
region_name: str = "us-east-1",
region_name: str | None = None,

*,
db_identifier: str,
db_type: NeptuneDbType | str = NeptuneDbType.CLUSTER,
aws_conn_id: str = "aws_default",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aws_conn_id could be None in this case default boto3 strategy would use.

Suggested change
aws_conn_id: str = "aws_default",
aws_conn_id: str | None = "aws_default",

):
super().__init__(**kwargs)
self.db_identifier = db_identifier
self.hook = NeptuneHook(aws_conn_id=aws_conn_id, region_name=region_name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should move hook definition to @cached_property, e.g.:

@cached_property
def hook(self) -> AthenaHook:
"""Create and return an AthenaHook."""
return AthenaHook(self.aws_conn_id, sleep_time=self.sleep_time, log_query=self.log_query)

Comment on lines +118 to +119
aws_conn_id: str = "aws_default",
region_name: str = "us-east-1",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

Comment on lines +151 to +152

__all__ = ["NeptuneStartDbOperator", "NeptuneStopDbOperator"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to include __all__ include it in the top of the module

Comment on lines +29 to +46
class NeptuneHook(AwsBaseHook):
"""
Interact with AWS Neptune using proper client from the boto3 library.

Hook attribute `conn` has all methods that listed in documentation

.. seealso::
- https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/neptune.html
- https://docs.aws.amazon.com/neptune/index.html

Additional arguments (such as ``aws_conn_id`` or ``region_name``) may be specified and
are passed down to the underlying AwsBaseHook.

.. seealso::
:class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsGenericHook`

:param aws_conn_id: The Airflow connection used for AWS credentials.
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please define Hook docsting by the same way it is implemented in other boto3-hooks. See example

class EcsHook(AwsGenericHook):
"""
Interact with Amazon Elastic Container Service (ECS).
Provide thin wrapper around :external+boto3:py:class:`boto3.client("ecs") <ECS.Client>`.
Additional arguments (such as ``aws_conn_id``) may be specified and
are passed down to the underlying AwsBaseHook.
.. seealso::
- :class:`airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
- `Amazon Elastic Container Service \
<https://docs.aws.amazon.com/AmazonECS/latest/APIReference/Welcome.html>`__
"""

You could check in main branch documentation how it looks like.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can i check how my current doc string looks like in web UI ? Any chance ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could build it in Breeze, be aware build entire documentation required a lot of time, so make sure you only build for Amazon

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The command for that is breeze build-docs --package-filter apache-airflow-providers-amazon and it does save a lot of build time. 👍

Comment on lines +83 to +88
def poke():
return self.get_db_cluster_state(db_cluster_id)

target_state = target_state.lower()
self._wait_for_state(poke, target_state, check_interval, max_attempts)
self.log.info("DB cluster snapshot '%s' reached the '%s' state", db_cluster_id, target_state)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thinks currently we have a different method for waiting operations in new hooks?

@vincbeck @ferruzzi @vandonr-amz Am I right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct! Please use the function waiter defined here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry to contradict Vincent, but we should be standardizing on this new Waiter implementation which offloads a lot of the work to the boto API instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch! No worries at all, it is good to be contradicted :) I forgot we implemented this. Side question, should we then deprecate the waiter function or is there any use case not covered by the custom waiters which the waiter function satisfy?

Copy link
Contributor

@ferruzzi ferruzzi Jan 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @syedahsn and I were working on them in parallel without noticing it, but whether there are usecases where his (the one you linked first) is the better answer... I don't think there are, but I could be mistaken.

And yeah, I guess we should flag one as deprecated, or at least leave a comment to that effect so folks don't add to the mess, and set some time aside to do the conversions. Batch is another that has it's own unique way of re-implementing the boto waiter and needs to get moved over to a standardized approach at some point.

I think it would be scope creep to have any of that done here, but the new waiters should definitely be done the "right" way at the very least.

Comment on lines +83 to +88
def poke():
return self.get_db_cluster_state(db_cluster_id)

target_state = target_state.lower()
self._wait_for_state(poke, target_state, check_interval, max_attempts)
self.log.info("DB cluster snapshot '%s' reached the '%s' state", db_cluster_id, target_state)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct! Please use the function waiter defined here

self.wait_for_completion = wait_for_completion

def execute(self, context: Context) -> str:
self.db_type = NeptuneDbType(self.db_type)
Copy link
Contributor

@vincbeck vincbeck Jan 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're overriding the value you already set on line 66? I am not sure I understand what you are trying to achieve here


if self.wait_for_completion:
self._wait_until_db_available()
return json.dumps(start_db_response, default=str)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

start_db_response is always None?

self.wait_for_completion = wait_for_completion

def execute(self, context: Context) -> str:
self.db_type = NeptuneDbType(self.db_type)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

Comment on lines +144 to +145
response = self.hook.conn.stop_db_cluster(DBClusterIdentifier=self.db_identifier)
return response
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
response = self.hook.conn.stop_db_cluster(DBClusterIdentifier=self.db_identifier)
return response
return self.hook.conn.stop_db_cluster(DBClusterIdentifier=self.db_identifier)

Comment on lines +18 to +20
======================================================
Amazon Neptune Documentation
======================================================
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please be sure lines of === are the same length as the title

Comment on lines +22 to +28
`Amazon Neptune is a fast, reliable, fully managed graph database service that makes it easy to build and run
applications that work with highly connected datasets. The core of Neptune is a purpose-built,
high-performance graph database engine that is optimized for storing billions of relationships and
querying the graph with milliseconds latency. Neptune supports the popular graph query languages
Apache TinkerPop Gremlin and W3C's SPARQL, allowing you to build queries that efficiently navigate highly connected
datasets. Neptune powers graph use cases such as recommendation engines, fraud detection, knowledge graphs,
drug discovery, and network security.`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
`Amazon Neptune is a fast, reliable, fully managed graph database service that makes it easy to build and run
applications that work with highly connected datasets. The core of Neptune is a purpose-built,
high-performance graph database engine that is optimized for storing billions of relationships and
querying the graph with milliseconds latency. Neptune supports the popular graph query languages
Apache TinkerPop Gremlin and W3C's SPARQL, allowing you to build queries that efficiently navigate highly connected
datasets. Neptune powers graph use cases such as recommendation engines, fraud detection, knowledge graphs,
drug discovery, and network security.`
Amazon Neptune is a fast, reliable, fully managed graph database service that makes it easy to build and run
applications that work with highly connected datasets. The core of Neptune is a purpose-built,
high-performance graph database engine that is optimized for storing billions of relationships and
querying the graph with milliseconds latency. Neptune supports the popular graph query languages
Apache TinkerPop Gremlin and W3C's SPARQL, allowing you to build queries that efficiently navigate highly connected
datasets. Neptune powers graph use cases such as recommendation engines, fraud detection, knowledge graphs,
drug discovery, and network security.


# Assuming Neptune DB is already created, its identifier is provided to test NeptuneStartDbOperator
# and NeptuneStopDbOperator
neptune_db_identifier = f"{test_context[ENV_ID_KEY]}-neptune-database"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we are trying to make system tests as self contained as possible which means, here it would be great if you could create the difference resources you need to start the database. It does not mean to create the operators associated to these actions, you can call these actions by creating custom tasks using TaskFlow API. A good example is example_batch.py

@eladkal
Copy link
Contributor

eladkal commented Feb 26, 2023

@swapz-z are you still working on this PR?

@github-actions
Copy link

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Apr 13, 2023
@github-actions github-actions bot closed this Apr 18, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers area:system-tests kind:documentation provider:amazon-aws AWS/Amazon - related issues stale Stale PRs per the .github/workflows/stale.yml policy file
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add AWS Neptune hook and operators
5 participants