From 004d1d3a84224ac728f0bcfee68dae13522fe907 Mon Sep 17 00:00:00 2001 From: ellisms <114107920+ellisms@users.noreply.github.com> Date: Fri, 26 Jan 2024 16:11:04 -0500 Subject: [PATCH] Adding Amazon Neptune Hook and Operators (#37000) * neptune operators * System test fix * Update return type Co-authored-by: Wei Lee * Update airflow/providers/amazon/aws/operators/neptune.py Co-authored-by: Wei Lee * Update airflow/providers/amazon/aws/operators/neptune.py Co-authored-by: Wei Lee * Update airflow/providers/amazon/aws/operators/neptune.py Co-authored-by: Wei Lee * PR Review changes * Review changes; fixed databrew waiter test case * Moved cluster states to hook * Update airflow/providers/amazon/aws/operators/neptune.py Co-authored-by: Elad Kalif <45845474+eladkal@users.noreply.github.com> --------- Co-authored-by: Wei Lee Co-authored-by: Elad Kalif <45845474+eladkal@users.noreply.github.com> --- airflow/providers/amazon/aws/hooks/neptune.py | 85 +++++++ .../providers/amazon/aws/operators/neptune.py | 218 ++++++++++++++++++ .../providers/amazon/aws/triggers/neptune.py | 115 +++++++++ .../providers/amazon/aws/waiters/neptune.json | 85 +++++++ airflow/providers/amazon/provider.yaml | 15 ++ .../operators/neptune.rst | 77 +++++++ .../aws/Amazon-Neptune_64.png | Bin 0 -> 19338 bytes .../amazon/aws/hooks/test_neptune.py | 52 +++++ .../amazon/aws/operators/test_neptune.py | 152 ++++++++++++ .../amazon/aws/triggers/test_neptune.py | 82 +++++++ .../amazon/aws/waiters/test_neptune.py | 89 +++++++ .../providers/amazon/aws/example_neptune.py | 68 ++++++ 12 files changed, 1038 insertions(+) create mode 100644 airflow/providers/amazon/aws/hooks/neptune.py create mode 100644 airflow/providers/amazon/aws/operators/neptune.py create mode 100644 airflow/providers/amazon/aws/triggers/neptune.py create mode 100644 airflow/providers/amazon/aws/waiters/neptune.json create mode 100644 docs/apache-airflow-providers-amazon/operators/neptune.rst create mode 100644 docs/integration-logos/aws/Amazon-Neptune_64.png create mode 100644 tests/providers/amazon/aws/hooks/test_neptune.py create mode 100644 tests/providers/amazon/aws/operators/test_neptune.py create mode 100644 tests/providers/amazon/aws/triggers/test_neptune.py create mode 100644 tests/providers/amazon/aws/waiters/test_neptune.py create mode 100644 tests/system/providers/amazon/aws/example_neptune.py diff --git a/airflow/providers/amazon/aws/hooks/neptune.py b/airflow/providers/amazon/aws/hooks/neptune.py new file mode 100644 index 0000000000000..a0640647e36de --- /dev/null +++ b/airflow/providers/amazon/aws/hooks/neptune.py @@ -0,0 +1,85 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook + + +class NeptuneHook(AwsBaseHook): + """ + Interact with Amazon Neptune. + + Additional arguments (such as ``aws_conn_id``) may be specified and + are passed down to the underlying AwsBaseHook. + + .. seealso:: + - :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook` + """ + + AVAILABLE_STATES = ["available"] + STOPPED_STATES = ["stopped"] + + def __init__(self, *args, **kwargs): + kwargs["client_type"] = "neptune" + super().__init__(*args, **kwargs) + + def wait_for_cluster_availability(self, cluster_id: str, delay: int = 30, max_attempts: int = 60) -> str: + """ + Wait for Neptune cluster to start. + + :param cluster_id: The ID of the cluster to wait for. + :param delay: Time in seconds to delay between polls. + :param max_attempts: Maximum number of attempts to poll for completion. + :return: The status of the cluster. + """ + self.get_waiter("cluster_available").wait( + DBClusterIdentifier=cluster_id, WaiterConfig={"Delay": delay, "MaxAttempts": max_attempts} + ) + + status = self.get_cluster_status(cluster_id) + self.log.info("Finished waiting for cluster %s. Status is now %s", cluster_id, status) + + return status + + def wait_for_cluster_stopped(self, cluster_id: str, delay: int = 30, max_attempts: int = 60) -> str: + """ + Wait for Neptune cluster to stop. + + :param cluster_id: The ID of the cluster to wait for. + :param delay: Time in seconds to delay between polls. + :param max_attempts: Maximum number of attempts to poll for completion. + :return: The status of the cluster. + """ + self.get_waiter("cluster_stopped").wait( + DBClusterIdentifier=cluster_id, WaiterConfig={"Delay": delay, "MaxAttempts": max_attempts} + ) + + status = self.get_cluster_status(cluster_id) + self.log.info("Finished waiting for cluster %s. Status is now %s", cluster_id, status) + + return status + + def get_cluster_status(self, cluster_id: str) -> str: + """ + Get the status of a Neptune cluster. + + :param cluster_id: The ID of the cluster to get the status of. + :return: The status of the cluster. + """ + return self.get_conn().describe_db_clusters(DBClusterIdentifier=cluster_id)["DBClusters"][0]["Status"] diff --git a/airflow/providers/amazon/aws/operators/neptune.py b/airflow/providers/amazon/aws/operators/neptune.py new file mode 100644 index 0000000000000..a55b40c378567 --- /dev/null +++ b/airflow/providers/amazon/aws/operators/neptune.py @@ -0,0 +1,218 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, Sequence + +from airflow.configuration import conf +from airflow.providers.amazon.aws.hooks.neptune import NeptuneHook +from airflow.providers.amazon.aws.operators.base_aws import AwsBaseOperator +from airflow.providers.amazon.aws.triggers.neptune import ( + NeptuneClusterAvailableTrigger, + NeptuneClusterStoppedTrigger, +) +from airflow.providers.amazon.aws.utils.mixins import aws_template_fields + +if TYPE_CHECKING: + from airflow.utils.context import Context + + +class NeptuneStartDbClusterOperator(AwsBaseOperator[NeptuneHook]): + """Starts an Amazon Neptune DB cluster. + + Amazon Neptune Database is a serverless graph database designed for superior scalability + and availability. Neptune Database provides built-in security, continuous backups, and + integrations with other AWS services + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:NeptuneStartDbClusterOperator` + + :param db_cluster_id: The DB cluster identifier of the Neptune DB cluster to be started. + :param wait_for_completion: Whether to wait for the cluster to start. (default: True) + :param deferrable: If True, the operator will wait asynchronously for the cluster to start. + This implies waiting for completion. This mode requires aiobotocore module to be installed. + (default: False) + :param waiter_delay: Time in seconds to wait between status checks. + :param waiter_max_attempts: Maximum number of attempts to check for job completion. + :param aws_conn_id: The Airflow connection used for AWS credentials. + If this is ``None`` or empty then the default boto3 behaviour is used. If + running Airflow in a distributed manner and aws_conn_id is None or + empty, then default boto3 configuration would be used (and must be + maintained on each worker node). + :param region_name: AWS region_name. If not specified then the default boto3 behaviour is used. + + :param botocore_config: Configuration dictionary (key-values) for botocore client. See: + https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html + :return: dictionary with Neptune cluster id + """ + + aws_hook_class = NeptuneHook + template_fields: Sequence[str] = aws_template_fields("cluster_id") + + def __init__( + self, + db_cluster_id: str, + wait_for_completion: bool = True, + waiter_delay: int = 30, + waiter_max_attempts: int = 60, + deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), + **kwargs, + ): + super().__init__(**kwargs) + self.cluster_id = db_cluster_id + self.wait_for_completion = wait_for_completion + self.deferrable = deferrable + self.delay = waiter_delay + self.max_attempts = waiter_max_attempts + + def execute(self, context: Context) -> dict[str, str]: + self.log.info("Starting Neptune cluster: %s", self.cluster_id) + + # Check to make sure the cluster is not already available. + status = self.hook.get_cluster_status(self.cluster_id) + if status.lower() in NeptuneHook.AVAILABLE_STATES: + self.log.info("Neptune cluster %s is already available.", self.cluster_id) + return {"db_cluster_id": self.cluster_id} + + resp = self.hook.conn.start_db_cluster(DBClusterIdentifier=self.cluster_id) + status = resp.get("DBClusters", {}).get("Status", "Unknown") + + if self.deferrable: + self.log.info("Deferring for cluster start: %s", self.cluster_id) + + self.defer( + trigger=NeptuneClusterAvailableTrigger( + aws_conn_id=self.aws_conn_id, + db_cluster_id=self.cluster_id, + waiter_delay=self.delay, + waiter_max_attempts=self.max_attempts, + ), + method_name="execute_complete", + ) + + elif self.wait_for_completion: + self.log.info("Waiting for Neptune cluster %s to start.", self.cluster_id) + self.hook.wait_for_cluster_availability(self.cluster_id, self.delay, self.max_attempts) + + return {"db_cluster_id": self.cluster_id} + + def execute_complete(self, context: Context, event: dict[str, Any] | None = None) -> dict[str, str]: + status = "" + cluster_id = "" + + if event: + status = event.get("status", "") + cluster_id = event.get("cluster_id", "") + + self.log.info("Neptune cluster %s available with status: %s", cluster_id, status) + + return {"db_cluster_id": cluster_id} + + +class NeptuneStopDbClusterOperator(AwsBaseOperator[NeptuneHook]): + """ + Stops an Amazon Neptune DB cluster. + + Amazon Neptune Database is a serverless graph database designed for superior scalability + and availability. Neptune Database provides built-in security, continuous backups, and + integrations with other AWS services + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:NeptuneStartDbClusterOperator` + + :param db_cluster_id: The DB cluster identifier of the Neptune DB cluster to be stopped. + :param wait_for_completion: Whether to wait for cluster to stop. (default: True) + :param deferrable: If True, the operator will wait asynchronously for the cluster to stop. + This implies waiting for completion. This mode requires aiobotocore module to be installed. + (default: False) + :param waiter_delay: Time in seconds to wait between status checks. + :param waiter_max_attempts: Maximum number of attempts to check for job completion. + :param aws_conn_id: The Airflow connection used for AWS credentials. + If this is ``None`` or empty then the default boto3 behaviour is used. If + running Airflow in a distributed manner and aws_conn_id is None or + empty, then default boto3 configuration would be used (and must be + maintained on each worker node). + :param region_name: AWS region_name. If not specified then the default boto3 behaviour is used. + :param botocore_config: Configuration dictionary (key-values) for botocore client. See: + https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html + :return: dictionary with Neptune cluster id + """ + + aws_hook_class = NeptuneHook + template_fields: Sequence[str] = aws_template_fields("cluster_id") + + def __init__( + self, + db_cluster_id: str, + wait_for_completion: bool = True, + waiter_delay: int = 30, + waiter_max_attempts: int = 60, + deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), + **kwargs, + ): + super().__init__(**kwargs) + self.cluster_id = db_cluster_id + self.wait_for_completion = wait_for_completion + self.deferrable = deferrable + self.delay = waiter_delay + self.max_attempts = waiter_max_attempts + + def execute(self, context: Context) -> dict[str, str]: + self.log.info("Stopping Neptune cluster: %s", self.cluster_id) + + # Check to make sure the cluster is not already stopped. + status = self.hook.get_cluster_status(self.cluster_id) + if status.lower() in NeptuneHook.STOPPED_STATES: + self.log.info("Neptune cluster %s is already stopped.", self.cluster_id) + return {"db_cluster_id": self.cluster_id} + + resp = self.hook.conn.stop_db_cluster(DBClusterIdentifier=self.cluster_id) + status = resp.get("DBClusters", {}).get("Status", "Unknown") + + if self.deferrable: + self.log.info("Deferring for cluster stop: %s", self.cluster_id) + + self.defer( + trigger=NeptuneClusterStoppedTrigger( + aws_conn_id=self.aws_conn_id, + db_cluster_id=self.cluster_id, + waiter_delay=self.delay, + waiter_max_attempts=self.max_attempts, + ), + method_name="execute_complete", + ) + + elif self.wait_for_completion: + self.log.info("Waiting for Neptune cluster %s to start.", self.cluster_id) + self.hook.wait_for_cluster_stopped(self.cluster_id, self.delay, self.max_attempts) + + return {"db_cluster_id": self.cluster_id} + + def execute_complete(self, context: Context, event: dict[str, Any] | None = None) -> dict[str, str]: + status = "" + cluster_id = "" + + if event: + status = event.get("status", "") + cluster_id = event.get("cluster_id", "") + + self.log.info("Neptune cluster %s stopped with status: %s", cluster_id, status) + + return {"db_cluster_id": cluster_id} diff --git a/airflow/providers/amazon/aws/triggers/neptune.py b/airflow/providers/amazon/aws/triggers/neptune.py new file mode 100644 index 0000000000000..4b7d34f5423a5 --- /dev/null +++ b/airflow/providers/amazon/aws/triggers/neptune.py @@ -0,0 +1,115 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING + +from airflow.providers.amazon.aws.hooks.neptune import NeptuneHook +from airflow.providers.amazon.aws.triggers.base import AwsBaseWaiterTrigger + +if TYPE_CHECKING: + from airflow.providers.amazon.aws.hooks.base_aws import AwsGenericHook + + +class NeptuneClusterAvailableTrigger(AwsBaseWaiterTrigger): + """ + Triggers when a Neptune Cluster is available. + + :param db_cluster_id: Cluster ID to poll. + :param waiter_delay: The amount of time in seconds to wait between attempts. + :param waiter_max_attempts: The maximum number of attempts to be made. + :param aws_conn_id: The Airflow connection used for AWS credentials. + :param region_name: AWS region name (example: us-east-1) + """ + + def __init__( + self, + *, + db_cluster_id: str, + waiter_delay: int = 30, + waiter_max_attempts: int = 60, + aws_conn_id: str | None = None, + region_name: str | None = None, + **kwargs, + ) -> None: + super().__init__( + serialized_fields={"db_cluster_id": db_cluster_id}, + waiter_name="cluster_available", + waiter_args={"DBClusterIdentifier": db_cluster_id}, + failure_message="Failed to start Neptune cluster", + status_message="Status of Neptune cluster is", + status_queries=["DBClusters[0].Status"], + return_key="db_cluster_id", + return_value=db_cluster_id, + waiter_delay=waiter_delay, + waiter_max_attempts=waiter_max_attempts, + aws_conn_id=aws_conn_id, + **kwargs, + ) + + def hook(self) -> AwsGenericHook: + return NeptuneHook( + aws_conn_id=self.aws_conn_id, + region_name=self.region_name, + verify=self.verify, + config=self.botocore_config, + ) + + +class NeptuneClusterStoppedTrigger(AwsBaseWaiterTrigger): + """ + Triggers when a Neptune Cluster is stopped. + + :param db_cluster_id: Cluster ID to poll. + :param waiter_delay: The amount of time in seconds to wait between attempts. + :param waiter_max_attempts: The maximum number of attempts to be made. + :param aws_conn_id: The Airflow connection used for AWS credentials. + :param region_name: AWS region name (example: us-east-1) + """ + + def __init__( + self, + *, + db_cluster_id: str, + waiter_delay: int = 30, + waiter_max_attempts: int = 60, + aws_conn_id: str | None = None, + region_name: str | None = None, + **kwargs, + ) -> None: + super().__init__( + serialized_fields={"db_cluster_id": db_cluster_id}, + waiter_name="cluster_stopped", + waiter_args={"DBClusterIdentifier": db_cluster_id}, + failure_message="Failed to stop Neptune cluster", + status_message="Status of Neptune cluster is", + status_queries=["DBClusters[0].Status"], + return_key="db_cluster_id", + return_value=db_cluster_id, + waiter_delay=waiter_delay, + waiter_max_attempts=waiter_max_attempts, + aws_conn_id=aws_conn_id, + **kwargs, + ) + + def hook(self) -> AwsGenericHook: + return NeptuneHook( + aws_conn_id=self.aws_conn_id, + region_name=self.region_name, + verify=self.verify, + config=self.botocore_config, + ) diff --git a/airflow/providers/amazon/aws/waiters/neptune.json b/airflow/providers/amazon/aws/waiters/neptune.json new file mode 100644 index 0000000000000..d71ee9a75bf51 --- /dev/null +++ b/airflow/providers/amazon/aws/waiters/neptune.json @@ -0,0 +1,85 @@ +{ + "version": 2, + "waiters": { + "cluster_available": { + "operation": "DescribeDBClusters", + "delay": 30, + "maxAttempts": 60, + "acceptors": [ + { + "matcher": "path", + "argument": "DBClusters[0].Status", + "expected": "available", + "state": "success" + }, + { + "matcher": "path", + "argument": "DBClusters[0].Status", + "expected": "deleting", + "state": "failure" + }, + { + "matcher": "path", + "argument": "DBClusters[0].Status", + "expected": "inaccessible-encryption-credentials", + "state": "failure" + }, + { + "matcher": "path", + "argument": "DBClusters[0].Status", + "expected": "inaccessible-encryption-credentials-recoverable", + "state": "failure" + }, + { + "matcher": "path", + "argument": "DBClusters[0].Status", + "expected": "migration-failed", + "state": "failure" + }, + { + "matcher": "path", + "argument": "DBClusters[0].Status", + "expected": "stopped", + "state": "retry" + } + ] + }, + "cluster_stopped": { + "operation": "DescribeDBClusters", + "delay": 30, + "maxAttempts": 60, + "acceptors": [ + { + "matcher": "path", + "argument": "DBClusters[0].Status", + "expected": "stopped", + "state": "success" + }, + { + "matcher": "path", + "argument": "DBClusters[0].Status", + "expected": "deleting", + "state": "failure" + }, + { + "matcher": "path", + "argument": "DBClusters[0].Status", + "expected": "inaccessible-encryption-credentials", + "state": "failure" + }, + { + "matcher": "path", + "argument": "DBClusters[0].Status", + "expected": "inaccessible-encryption-credentials-recoverable", + "state": "failure" + }, + { + "matcher": "path", + "argument": "DBClusters[0].Status", + "expected": "migration-failed", + "state": "failure" + } + ] + } + } +} diff --git a/airflow/providers/amazon/provider.yaml b/airflow/providers/amazon/provider.yaml index 5bf6a45100772..65603f10986ac 100644 --- a/airflow/providers/amazon/provider.yaml +++ b/airflow/providers/amazon/provider.yaml @@ -337,6 +337,12 @@ integrations: external-doc-url: https://aws.amazon.com/verified-permissions/ logo: /integration-logos/aws/Amazon-Verified-Permissions.png tags: [aws] + - integration-name: Amazon Neptune + external-doc-url: https://aws.amazon.com/neptune/ + logo: /integration-logos/aws/Amazon-Neptune_64.png + how-to-guide: + - /docs/apache-airflow-providers-amazon/operators/neptune.rst + tags: [aws] operators: - integration-name: Amazon Athena @@ -416,6 +422,9 @@ operators: - integration-name: AWS Glue DataBrew python-modules: - airflow.providers.amazon.aws.operators.glue_databrew + - integration-name: Amazon Neptune + python-modules: + - airflow.providers.amazon.aws.operators.neptune sensors: - integration-name: Amazon Athena @@ -602,6 +611,9 @@ hooks: - integration-name: Amazon Verified Permissions python-modules: - airflow.providers.amazon.aws.hooks.verified_permissions + - integration-name: Amazon Neptune + python-modules: + - airflow.providers.amazon.aws.hooks.neptune triggers: - integration-name: Amazon Web Services @@ -654,6 +666,9 @@ triggers: - integration-name: AWS Glue DataBrew python-modules: - airflow.providers.amazon.aws.triggers.glue_databrew + - integration-name: Amazon Neptune + python-modules: + - airflow.providers.amazon.aws.triggers.neptune transfers: - source-integration-name: Amazon DynamoDB diff --git a/docs/apache-airflow-providers-amazon/operators/neptune.rst b/docs/apache-airflow-providers-amazon/operators/neptune.rst new file mode 100644 index 0000000000000..98c0d7dd57c4a --- /dev/null +++ b/docs/apache-airflow-providers-amazon/operators/neptune.rst @@ -0,0 +1,77 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +============== +Amazon Neptune +============== + +`Amazon Neptune Database `__ is a serverless graph database designed +for superior scalability and availability. Neptune Database provides built-in security, +continuous backups, and integrations with other AWS services. + +Prerequisite Tasks +------------------ + +.. include:: ../_partials/prerequisite_tasks.rst + +Generic Parameters +------------------ + +.. include:: ../_partials/generic_parameters.rst + +Operators +--------- + +.. _howto/operator:NeptuneStartDbClusterOperator: + +Start a Neptune database cluster +================================ + +To start a existing Neptune database cluster, you can use +:class:`~airflow.providers.amazon.aws.operators.neptune.StartNeptuneDbClusterOperator`. +This operator can be run in deferrable mode by passing ``deferrable=True`` as a parameter. This requires +the aiobotocore module to be installed. + +.. note:: + This operator only starts an existing Neptune database cluster, it does not create a cluster. + +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_neptune.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_start_neptune_cluster] + :end-before: [END howto_operator_start_neptune_cluster] + +.. _howto/operator:StopNeptuneDbClusterOperator: + +Stop a Neptune database cluster +=============================== + +To stop a running Neptune database cluster, you can use +:class:`~airflow.providers.amazon.aws.operators.neptune.StartNeptuneDbClusterOperator`. +This operator can be run in deferrable mode by passing ``deferrable=True`` as a parameter. This requires +the aiobotocore module to be installed. + +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_neptune.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_stop_neptune_cluster] + :end-before: [END howto_operator_stop_neptune_cluster] + +Reference +--------- + +* `AWS boto3 library documentation for Neptune `__ diff --git a/docs/integration-logos/aws/Amazon-Neptune_64.png b/docs/integration-logos/aws/Amazon-Neptune_64.png new file mode 100644 index 0000000000000000000000000000000000000000..8dd8f1e80d66a26bde2673453b9858c7ea9c4952 GIT binary patch literal 19338 zcmeFZWm{WM)c=bU+})wLQ>;L7FRsA}?oixain|nPahKq3!KF~#y@lfL_RsIWuXD~D zIFHVC9_%a0%gdR#TC~KqW?*PC-Td+0?a-a{e zZW?luFttt4yK}Q?Y!1C`O6Sch5oWZSzw$XF8B1^@|V-V*d zW`&mPR!U~q>CO4;ET>oRR{P@-$H5PCA2U-%xpx4pWH9z`csQNUyeX^l0BpE$m_!&H z!g%=qnla#{fgLymd}-L!2;aaq=+NF^Cv0lSYSj z9svML%SH+h?aL>Fp5ji&V#h=y4!{P@5K}|@HmJqJOUTsKrBI-63y2+tP67Cb1U+Tr zTMVfp^xdU_yhu=!#08+I9Lj3Vt3l16h7@u`r-18_2nKu+C@hS`g&GMp@qaA;|4sb= z;6@FUg;IBZ~i4 zVr5cwZqFu5b9_0w3FV79m>aPbC&$mjo+eN+){{;}(k# z1e6KkSY*a;WljcH5hCGvr(JvaGJwkO>2A zkX2s%(fOuraq;+fyY}zzT5M5n;feSXFG+D`1U6Ol6Rk=%mJxl+nos>kjdZ`%*4C#m z>A2F+>tz|u`gz^hPtn;w<0(ekVlczYjBCKa(pJOe%cDX>1&Axp2>3iW>nc8N#@D4Z zkco1)yk!2sczo~q^S*}rfM$ZRWqD-!2)*i3qS+UvZ_DOGitSWg45}1_rSap&{#tC4 z{OBU2x{kVwlKER89WIUuy=Xs~0($fXl}bsc%bi8Kd+g#hxjxaGQe#slaELUtU0l$X z8j=6%AeL9x`rpy`J`pOKr(=}oB)1r4OC;udUGj1ZD8rH6=Q!^#1F<2DVZSLf%W$-+ zRG@a4p^Pt_I{4(YKEBBAG7~DH4u!N^o$F;i{(Ky@(JG*=ATEOLMB3togk?j1z5x?! z-OGh6>VA#lhlOD-ZAkHMFL8U!1pG9@mQ0ht%u-6JIveji|}>3n0MMwpXpJ2&fpDDg3ulhmWlME%}(yM%UPct;p0~ zSrM3p=pIJ-n~CAe8JsQYPy+X*@ROYi5U~^^OkwwmN~3p9^I)N;Sa*8#$OvQ zijD)!9zc-{(UKvRmt-=A{lEirZN;yVQhkQue^1pWmT_ri+n~tw2Y{D>-pyH*7iH;#Q83HEX3A+EG(%y zWm)6#LLvlo6(HAb`(;A&ovdxtaT`vk$JU=jtAbA@L=vPGc|U3v^4W9atuuGv&u zxF6yupNe&$wiH|CqS^`b#l?`W#;>U(;T2`rz!h;pd@=kJ@RXc~5o^-*M?Dy{TM}d>DLwd6+ z*HOtG9!>HOh&~c_W{OC47%xluamd`!%<;z9T!bLKHV>2Y3gZJ@X(|RhFp`MNt-M2q zmN??ftQy=Xnnu{kAaQq}e4jVdD``^Xlk-z4aSS+nbNTm0hmgegUDDMbUR1en=b!r1 z(!_HB;+{$Q!~=oESi}UYCy338NN~e6Y~_vjhI2Fk37hFFG-s0VGLiqrP5{J1ljw?> zm*wM7`PfC#S`u;PCHe%z7XrV=ZVzE*J*_|YNd1vF8VAfc>5t;fhAXWo50>{Wo0b&- zyQxjFA+Ee@!kh%eX+EsL^*)4@Q@t>EFCr>KeJteHHlVskSWeEh&NP_uZSEV-jl>rWD$qm&#oft z88Zujlci=mKSu15kJ}O|!D*`?`70$ZK2DfG#dxE703Z$lR|@ESEQVq22t`(iJHam@ zLZG%B&7RS}69iclGo4bKB0$VKQ?s6LLp`@kGtvlPTdG&~imNe-AnB5Q58=)JNlI!g zeELnsmf_0-|k3$f=8`tzky7XJ~-&%fk?zBp*AS+%V1D{JPH*-u*n@AP}XdZ z69+no?}Ufu1)E`n!HlhP6|ybj?BQh)+kYJTUEmTxBi^u>*=5nR_z=EuvoA~1rbhUg z7TQ)c|Dk#1Aslh!<}BVd7grIIj9ZleQ;e=xwTW&gu7|@V3WDvKVqG0ar6s6EM35iX zPm*k3Gew15^%PM4_&tF^pI%1`m&93IZ3bU{G)@ieflCW8`RD5Ki8YLQK?8(9tvA{V zN!ReGq&6jlG%Tzl^{g>2Sa-9~7AVrwz`)O)xm`<+Y8Jyi-AeHAK**uf{%5*?G( zFloN5km6Xn*n!#mq1ukr&jThTj0e*~q)je?jr(qSn~m}cGIEld4HZ_oAj`{vHnFpi zcy{Oe6oG3thSh+2mn8f@G7cq1B$?H5R`m*iG+rfjz--&(bmnQh9P zXsgW|z^*U13600r=X%7QmMlUz!h{m*(m$OsubDr~+Tw}q;#46tROM^cv`WttfR+E)3{InIVDHiM%7K+t42ytT5 zfjjJBU*6FbLf3Yicza8}uGJy)zq?b^&d-{7EBqT%LC+20=ZXp?xX2vWN#e0!@AMom zM?)A?`RqRPJv2T}W#rD#8{2xP<~*(1TidS?87m{|CuJk$N%2E zfb7Eud#i+1qWZ{sym=eHUrLdtJzGx*r3jpt!B8x&5JTa$lK^o4Iz4 z#lEYG^ZLp){hJ2|BvDShf-K5&^0~3+UNdh>B}#;QPeA9ruGzZ{pJ=`N!MDPya&y}P zkg$wV8S=&9=}cuIp@Q6tBR!uZYHrS|R_iwmb|DCeyGsB|fQUD;58@B~@g%6Lll5_7 zdVIWnN%!)DhbzYX*L2#7NlvbG`JI2}H~R`obv!gTw)_Z6ia-)>^L46@yU_>7IV0T` z--6qsq)^(xlD6}V_Nxp-mB_oude?A8qKx>7Fi=>Yg~VgNZ$9dr(tUMIb6FRqtkYX4|F3 zID$b)qvZ2IIj;yl2Gy`NLUE~Syd^v8Mn4M{W=7Rhu6IalcYAB_xie!7lj5a?8?QeVqaQ7SRNc=)kd!2L zxYT^6LQv97v;M)1RzQz_>+8t!%zoe3AC14xY&`lV!tCSmL?V)6_~2RXZFH4i@5w3P zbkN;Sb*00bC1oER;{vShUOMO0+MRr=GQB1%{`lL=7_D%Ma0Ua6GcW|7LgQPO-Lt#) zccaDORM&qhJv?UK_|ev^Wm@}LN`z^3KI&zP9!I^lvfEu>Z5 zQ%EBiFPhekOL`yAxrE{}1ocnp^JMtqe}YT6hIV`90$kyT64W#V}8#NjCWC*5d`z53tcQu@*9P=V2+IG}p9K~C!v z1I;YAg&*7fxmB09x9Ns>_4-!%g4IDpl=_4IEEn!c?kngYmK)q70ACQQ6);(@yj=`x z_UUc+eU0*}M%eJEaA3w1a6jQ1J%mvSef{$O;YSL5{-l8rc;qO@dcwBJE8N2f^p6uo`N=tFxNcxoFwG&-R}?;!Y=8)z4}9 z6?WbgliGY)JaJ*IBA{vfsz06)Q<~u!IcwMSC8SKM_(v`M&c8Z#X<*4Xdaw9$NMBee z*8=3!#>V@4sW`EU|1^@Qr6hEkbD#}zamJL@MW=#EGf|tO(P*+U9Qyr%eLF`5j|ozdZd?H}-YjG>n}V z;_juHNPfl+3^}0xbh~_TKqsY~FfEtD!6u}mlfi+rlnDk@RbQYIC2YOVexwnpO5NSv zjD7k0R}Ve0W;imB$EM04@1Y5pr}K6Vlz*b9_y-n$jz zn=$v}%{h?kzv)HJ&fE?!w5fj!!GZF!T9VPX~iW6<@aC*P=b}@mnLtn}m0rjO~&l zw;)r9Bf4QtR0TRbUD6UCZMBFvluYD{!gh>S{upd|$(u(`%*clcl$`-{%A6R9||>=m9JZ|mpxm;ShV z&e~W+89rEzq3~c^Y#Wgpr;(zZjygNMX&pU>Bt(*AiUaAXwcRA_hyW)s-T>8+$;CIp67SiIQ z026UyxJQk$*nLPo_u8j_?a@&MmMKMWbRK2V(v<*U>|i9REsu@y69a;(5l8{5mKK|_ z*G^MZiEy%d7)B_}P0e5~3S?y#LiZLSN3@eeD0KEuS4C4TbBga%eii+L@4oQ%>ziFj^AII#23KgKBpl4qSE9$M&-SQ)`i4Ov-b861$UBKbVA zCN+$`e_WtsSP0In7m{o<|3f7 z4Ae`nT)b!@Ii#Y=r$FGzNQiA_ETYh7%fOkM8m&!JuP!ZA5(gfC1M~9zDd#31StaL$ zrGhQF$5mWZ(%Hk)fua+>Fn+#p>}?#TIjzwdh`wzD>q4w)oGS1?3|6X;VLV*)X_X{z zw+?~uIm$0|sBn4bB<@6Uh2EYK>eM4v3)k7qVeZc3N>UGhU|T`eJc)Bbvx51fECIZT z{1fo`i}o~3Xfek>K*+Y!;wNecu5F#h`7>%X9v;{JXc{^lPIa~b?Cu*0CE@3ubqa3= z;v8-W6$$t2!dFsOUAHde-p@=z`|X$*XZn#&e`XschK;ru1w_hTdy0rI7=hJ!U}AVO z^tJ^>03s6orXlsKRSc(MC4yAzeri~x?4*$Ha4HaXZ8gE{2N}&WvDw!#2`XFrc z7Mw!%?_v$ee^vJu)vrAKq4=mN_FY+a+j7)U=V{DIP&SaxfGL zs|kJfWuV%g*T!d>k0PIj+h}{`h?LNv0J|-cP&4gr?!WLNE}O#1C8PU?`&KhA&rZfs zWlM;`qM)!|*_98wteJ(x@KrYPRF!sn9k@IbXr^*kL(ZTK-+IS=*N7c~@UkBBD6E2z z>S9r6xVFGH=BY6E*KV;+dc%k+*#+?L#Km{o2_Ql;I4&7IUJ*Bo4R$&1nTbu$@6xbS zt*y{Mp542lIaS-#cpoSA9I1LXay++lbuMoaWUH`2Ot*gOy~-K*nI^sjhXi;3GWp^GysJCEfP>A zh?{k#zB>Vf(|POCW+=#cPemM^v#BiqEE*E-YiYREXje-tvK)7i`7H)?FP&MQp0nss z5oiHm9VFAIbeM=E{(Tigi$aoGisx1lK?5;xvkhScfabXzBBpZEz&vKoLam_ZFtVI2 zNiVMtZ|G_A36DeB(}bT_GxgzPW{a#nx6I$@Oo+fvYy>guH&BB}rE+N2m^JYT)q*cc z1W~tciURK$cDjSv>~1uK;Feo8T=>EZEu6#CH#0R9vmV|iC*LPCH6-i1u6ys}w}gz? zbjo(D7^~x!kBrqDPkS(qosVjc=;km9NXB$(jlxq|6_i8ZGF6DsMQWI8G>}?^+)r^7 zV2)_HPq)eOQX{T^Ec||WdRj4g21k>T810~(0coF1ND%Byc0M8Z)x6Bcejsq> z?T$BZV7f2iFRRFm5)wHFY~56we+W`OZ@0=W#y49#a-dtY{Z0@|DqqxR;JP2&u4l-kY*DaQl=KXEe67aPe0c|h$5B3m{ zxFL2yy!Sc>CTfz{{ILMHckc(amUY1diNQ148(G_P@62QT|p z>>PlMyU{8m`{(z+-iD+6<~7g?ZrTCh)7X!>%;8yR{+{*i7{4LH{2LvjEcv{`_hdic zonfahDW~^Ypq|lAbD2h0zXotGEP!>>f2b%nnT}!AnGX#F3Xa5*^X$DAACX}<(ruoY zTlv}|w90AMk4F;wM2Yr5dH?+06Hby#DBG*B8vK*>*d&GG*SP*;+mA%AWjnnk{k_=e zow|e(qTP@8ztiK2uDDmulzf#hU-X^{DVYziggD$lJiV9C_NxAN$9;|g&~U$_gWkZf z>V;EUn6Ck9zS8TWgY|cdXq5&fEc>!&8x*UjP|}H__fIG-vs#VX&T-LJK)q`22+A&3 z;tpN4#99tVY#qZ!hP6QU-o8k3Etk)08(eWAs=$JQ zK~Imc`a)27mqZwB;}YB&@b_kDgZ5?63`!n$M*80U>UR|fyNX zbe!0C=P#X`yny4eJl!UqK#^U`e-2*xydtBx_w-O4TR#-M!4W4y@%oo&Ys*2A_*2Eo z+1-iD9-i!oYv!672D3a+y!6acrdbvWG6OvP-XsdrrKrtEC{9sMQLGJ0{$?(sWs4Z- zY0A7b5AY3WNvg80kHa__L*ROdu^W4hvz}|PGEsNR3+>{~CWO`>#-_T|a2W3TH$YrO z5B86yA9%sF(C*7#I=3$hWs3`q5ItXvfjzQ;{B_(m7{sz)A;LLO?T}W&m z)hVS|Fh+l$>ZYdli#<(n)wB8OiO#RhR`5+6juN=BlbLRDK0LiYwyegF$b}q!qpIaz zVCl!GMp*_dcf&H->1|zQf9fwx%h3xLXpGr!jqsCBy*lLq1MT!hx!G0FLLM)5F1@s@ zQq)?zZerJ>)}rCFQg5H}!gz4emT4x%-RSbLj0EuHQOkI&EtgtBL6FmLs3){$nc)Ly z8S5^Yb&)baTRPlB%Q(O0F^cd~{1JTG{Y%-JPyWeLFtcz5hsqI8!zS6j*_RIK$! z^Z*IIH~X0?100IT*SFLlIi8Azza>#Ug*C&db3$!Ysn3TOs}FU?!YaSHal{+C;xu|g zQ!1Z~cBHCW>QmPLU|IQ!GMg^zYIx{m61Z8sz$T?tL87@k^tUz#Yu%v%-XHc7V(DRl zZN}+%)HftEfyzh!x;|qmZo>%@%xl>Px?Dq*koJ^IEtcWl3)Ox4emyqw<;A0>QZe?I zvI*4GbRFxb4cnSOh*1Tc3CizYA=)%89ryw!QQeTw=R)Nhe))t>6Vppej@$=Gxaj#^+>5^<~C_wnI{1d85mA7sji>XBMgwkkCgV zCX#-R&Pi7^%dj~9*S4VZh^TxfT_)qd;EZe16^DpBj3A%f2eSt1%+ z2nY|Ua&(35rhyciS6s-#M^%6Zt?2Roj%EkUekOv>*o)KTT>jh&wS7Pm7scQrStd99Go2V=?_Yo$5q$2mA2o1%PH$hu*k)}81#q1N zTYC&_>RvOLbMdV^#W@ji@|EvH!Sa~ldBITb^S#X1h`=CP&-JilJoxne_omB5J2soe zwi}6DIIRlbjtm?T7Cvt_`Xo4{`v4-P-?q{4c;-P&hR!Y7|M#)+es4KEIkGSq8m3(h zz)Y2VURxLJ8_pA){_0e%mR2WaPQTkSH|tigJ%ueQ#mP3|Mw{p+#a(;E-5n8XGlFbD zf?B2LEuQDM<%TWCw30O5yWK|8L$q>x*7U%Sv-nMkLkswhn1KwVnHUzqAj7cLWL=3K&iO{?Q0 z(1F&nxXyZQ6M%G9a1)Z(L|#trF9?SG{r#l4=03aAY+STfaT zX|Pxs3!7ygF4lw5dWVvt{#s>^>~uI# zk()n@Cy(r@2~vmGLGckLjQ13QCbxqgMUvv4@xYR>+I7D-E#L7Z{*COzE&1rN8kC_o zV+C1?2%vSx{-?Gl1&kPV6-Mthd{B5BHxRWz(YUaZy|oVb z%bY7o$O{e|Fcg`>MCKTFb1s)dX)A8NaktsQishk}oB&WIMaa(U_K@I(dC ziQ8c~_%rEP1Nu<+o}BBL<#~NRhnzNs=_dX$1`L7B);e=cze}}bkWUHi2cE-ca-)kE zk*Hz#oJ|I#ufI(ncLw&(Qf35N%CU0>!=!Ox1v(&N5Xh%tLQ1{gm;89_ZTOiZu)7J}wsX5_l_{Ld&WsE&ayw5(WeBtj*);WP zSpk)!WtTuPP^UojUutq`$+5SHYvcuCiB?=k3*}+BDERbq0&w7#b)N_>W$7~W zPENsZ0lpQoKa(ljlZfa6v7xg%$+PSVh>(CGa7%hBN*?zcg%6EhIkcWtmmDl#tUyFUzZ?=!n3p?N+A zXIp40EZ(^ESX>;~Va`3FPcl7uBUfD-xS!S;?R#pO0(Vn0?8Kb8l?|o$D^u9qV?4

MA5gtaP>;ES5Nm>xW(AALgFO+Fh`4hhyx{W81qqGe0P zf(i4YGXAj4uvsu+tOGz8Z*_GM)mcO$u6Chi3qwikXoQ1(=+e6k_)D~ohhH?&!;{+N z%tY19b>;2ZMpBQT9tGX${LyD-o=D0+@K&)q|K%)Er9 zu*vUKjoVqIXwzuaF0XkAwpoF=X~Hl&nml^y)e3Yq;wGr432t_Pga)u89<%5`=c$SU zR1v!(LgH0{+VC-S?ep3VqH>E*j_C2^VOQMIuM_rjn{!0?_{XAzM%H)j4RRHwu<5$B7>nqa2J%BbZ zx_i!o><<)w_ z?GS+sbNb%q#_vD7Np!DxRulP3@2ipSLebXVc@#6UBaFT{)Z$KPR5&g=_!8dRx6Eje zExIXpo+k#9l4yKQP3w7&q{I#{W($p0H|5FxFG%NpQ{%1)+o;OktGqm1IcpIBI9u&) z$0A@&o45CO*A))L?NEFcbA}($}jt3DW(G*{{*w4ED zClM&1{_7K*;yIt^N@rclp2Qc61(%g=sP>Gq`aHjVO0}rP98=qgr3 zVxT`%863s&(8Ww>;43(8jh_-%objd*J|xa0HZC0=LA9{T=~ZS{v0{>W9~vk+9QlU% zi<#Rxw9Ns0@$Kim{FBIeGJeuUf_&m0MKb>n;}QPy;6dR09y4;%1*4$Hz2SGZ$^2hT zKW7}k1cI^2!G%}YOcT_jRZ>zWHHPoEpO;UF-_M$|0t_*l4O5lB@~#w6uq-!4$i31n zCCbSbwlm{N;jD{8MpZdIG>l9liFm#AYmi>v;6$;g>?CcM1V#1XCqfIj3CWE;FXY8rNhgd|c}9`cp{;TMOX< z_3I4GR|z}@TVt+)uqFge5;O#M*El+)1;ca4tuL;@j(*ZCX$(qO#4FvRSMkx&Yu3xQ zJoQkhXQsjO0|{+5WNlH9U|~$smMOJ+KCgS4=ssB`jMpbS4Gg1kE3~xew|MX05O%X& zg7MwFqTqY9(S`XI>PupJzI@BL1(fPBtEoMP@w^ny%Zq<9BX)_IsxJAm*aao3E`P$= zv-Sd83*NNdx74ndK~+ScOp0e64Ybd$18t!#!L&s)kI00H>s<8*kg!m#Zv^Z6ozGu> zS{ERs(X|bSLyoRsXZuow#3zdv)zHZ%@%cAbdLIt@lATo|J#`_42K5>LSaWOo;y+3Z z_h{Guh>R*49rdIGvT;JbeutfEcEN6MN`0UF{0#52GmV?ZMZD6_V-VO(DT-?8_bQ2E zwy&dBNAlKS!EfEPE~3sLOwY!#{1jAE^Pag14MsZWFy6n%w*^D|y{M~;)B#bwf+8B_#b)92C@|D zQ#?o$)<)wppbU>L183-ek`xY3_zW9ix|n5!>3eKU%~UU~$wR#yfBC~UE&tok#>MyE zpJ!_u?^QSWaui3&tJO9tgFGC!GCTSV*8&@1t^DncclkpK4&kI&`G4`Gk1fu!sIAvi zx7>+z&YD{S-p~Ekd+!USmep}p^j%~ANjppnW9`Cp{7ET?xqhAZCdjIbJNMn&-ZfF| z_KK;&$hzFPy1r$6aYU#kQki7tK{F?Jz3#wUoXX3j8~-b7p@+0rv9@Wu@ei_!bL4!8|HZC!M9{77tWJkP|RFklnu$?9{a zzR=>jaOY}bjB6IJPxHl!f}AFZIhftRjsJyu8S-EuRBJj1$g0ngv86yw!Qk(&wrCCr z4IYZR0@_?3aFi0Z_z^b%LY9PsivDm=ijAeIe_we#;J5%^ZU1!__Lp*Wg!O|OLKDGy zH#)`B?;n=G^_<|kd!0Lj;Ga(j_=yKi7*E=YOQ5La#Ybn55L1it@jyI-uyrO@MhIe{rLhkm$a1O^gKjo?c@; z`jhv z+@={)iG*Q(FTq2#BCcPGyey%vIZH4jA)v8 z>I$oYxs}HPz6Ls!F`12C7}p=0sKbxU2-;Xb@39$&_n4c?lP%f#{-^|2$ih9KUPap{Y>~NxdI(b(u zq63}tC!*Pz5Z}s+#~X? z)|UU+eKp@zsKsy2a~4v^Booa^9AYoJ#%$FK=f3FXD7WU99n~w;b0OcJz?Sweb(!ZI zc#t;2{eeu_s}~FFjwgQ`6Hg!GUH^E~&}eupB7Bkg=llwX&NW_Xsf!^H^I#g9>?v^h z`^?ZSedpvGVFe_teJ*lb7D$`-S6l02w+F)uKGiUjEZNd8XrB#A16{F)cPMzXF8d;c zFw^Z+uZn+?zQt{cXXbs_$P|J4mf{s`)*&@KL*qCpvpy-4dtzjB85KN^_5bDL*4(Gm z2}mj|L-!5s4Uy|eX85deKnm%8fxcOPBl^WezJZt4>{Kr1$SH>J%xI}>$>HhD z1o(|zV(16ACAnm~O4yyy+)?XzgyP0$wi5#dBb0r9(dPuo(9VIUjcHoQc#Os)k)Y_P zpwk$Bd>-csgGHL}#hBP@?d(Pq)7Duot^}}6Jq?fg1+`G`<=D6lJn{=xB5$^97yL!= z^7Z=|)g*X@hEvzFhas%mUfXhG0HQ_Id7|inC&T`>Qa9%&Maxza8RGJ(OXE42iPHPu zSn04T=_FLNzAKKr26_q0+;6J&X*4SSe&JwyWujq7epG?>wc4dXnKDOp8&CBC`^#K6 z&?BGB;d1K5q4st|dGq7ZkRj(tm&4o6P{mKvkeZoiuV~AKdD8rT>U||}mf#gc9UnD3 z;c#x8ctY46 zF?Hqx`aFG6#f>!&fdYL229*LOnuHe5&X>VHMYpg2=0*v!RJk3_OYQ?m@wI5M0igr7(Rshw%P}!-b1!e2M$;4t9%jJ@JT)ye#1h zfpq=f7Q|tj;1ecTYKzP-Bq*?9H2I6@Xadg5@(J%tH~*AVR}NFo6SVU@dN4u)c z$4IbuMv^(5hC*H^WO+aos{b^~P&b_|}#Y$MJaNvwGtq{d82@DVx5=oXmasXIh zgWw*Nj7VS%X*jPkA$B5%&cMh&z~bp4Ly7`rNZ+W-N%ZA{KM$~Ml?vy8^ZV5vdb-p1 zielb12k9LzcP9v>!nHXpDh$JD`Zl41Qj#2XsGfAX=o`AW>Y@24sLCa)wqdwL))z=h zgw3bQ3b^24`1*-4ku;%s3NH206HRG}3Qg_g{Sn#!B6pMt^GRo$K_RAy2?HsO#L|vG z2Joe^uprVF6V{N?!th|Y68lUN-#g|SZ{e4xqQ{Y3>Dna?Kdhu570y7tIFztcfx8*j=_2_#?$W5`r$ zoImx}zol2}sm`rnc$*?~-$1S=|3^1yIz4)|iaGK%z~pS7f;0Vqs{N4#t5%DJCZ7a> zYwi&0uDo{tY7wd*Cgy_Hgn8|S9PLj|A20?9!c)L`MyYvE^PSe5+i3KysTE{{CsY!& z&|sxN;lUuN-a+|*|IWY5sD*^^!9NUQJ%PBH{7(F{+pB<>D=Y0buvD`lwcvxmdwv~M zT&h~1f}>JUMH zUB;EYt@`85<4j2&O%rUwNX}Gc-6jxGjG-12*}8GC)Vz&x)L=YlaN0hq!O;5KNqA%m z2?z^K7%@&+5nXg8C!Z7-pHBt>E{TsWeUzf)$fGu7?P)ptZ=61=?9Qn31n1&;_dhkL zLx++NKo!7-DrNPniftwH(OizW9|u;dN3aD!sYg*Yrf9dgHV6=iho|5JYouVV4dTze zcfuL50Xi7Hk58lgeo141x(%}P6YKR}P}+1ts9O!Ym_aWT+p~K*oo03nwTVu!Vq^M7 z_3~1Z#O_Dr997W(>{Dcb3&+vmNuFXFds*G0R8}2}D$U*L&B(Q-jIbIWj}qz_vFsKm428Nt?(ACI=OYw{_r!&+2b)oR4(ai!bo8-9OJPUUrT;DjnZT zq&PjZH{-s+6i*)Mp!&kdWNOto-!FD|MW(+34Oud--W_ds)Pr1fnS=-upLg$&+8SG+ zdQCFbswzJj40LmfFRbrD4z-D!ynG%@u{p+WukfnKBzM82#s*7wKlUrA zj&CD7>LU&HeBmD@$KwjQC1Xesht0qTtC+HzO8A)j(84_8ZXcFuQlXyR5Yqme>3tX7 z_UFo${cExUQ5K+SJwlJzQ&bOchUdKzTeQzWJU9yR@Ynm)NCxP^T|!C@Vh_rSHfGXP_$xT7O-Hj}jBe9?|hhrzNya(wL04FgEr_EWtGX)b%bUHK5OsyUgLzyzjU4*@HQFH!CwOIFo=nQPRd0K4f-vL!q%{I0vl8! z-_SaNG1@q=@_0N1T}z?$>L=%V(7HqP8gq9Hx02UPO2RIzEM&M@y3A1@LRA-M3i7vc z!L@dGV}kt`J8VekMQM;wuL!Z#mzf!ht(UzwXm;tu0>26kfm#karDdz9PY|(r5imQW zwV}xv?9Asql%&-%J|AH?nxn12xeOy%)2A%OMpL3Zf|_BOAijr9D2wzai=y~z(3`H;SSltIm@2&8y3`xcS&_*b%i zzdI{#Gl9mG93q+MvMJ(oFj-$q@Jhuf*!}Z}xUE^W>sL~trMmPGPVhXnu)Nx&0{Jk! ztV}(>Bw7IwA5zdC{82&{j$54x_AmaIXpZ?G^4G6s7x(}Hd{99Uoq*EA8$Q$T8EuK* z*(+~-NQOlw1X8f8GB8)6(Al_^2ce}>ro|Sl*`bHa)}e7Jjp|Gx{aNxl&pd>-Rucu8 z=y0-hY{YYd56*NG$O!z6Jb6)zlO69T8#e=imUa7c%Ph!nNjT7At>~MHVD8}LKK@!b zc+R*|N#QgcR?!%v1V6lq{PYKXrIcBh@S(5KC491{-1K@D8OpSMowNh2OY%NH<;+!ctUXzfuJ3!@!{a`|rO1cG`*f9lb<)+MlyM zpE}FVrePiiCO@faXST>+m7_;Xoh9{T-8HHjxu~is(S+JA|4myH4L7~MNrzV7A#x8M z?D=tuv^@~1`(5bO?;Ond=pA1Mtv0=tZmy;RTi+Y?{>exn1b<3Gext~2RxD&`f^CQ# zr145VjphVv?CLa*x-O3`{{RM5ze3-y$?$TLsKXo9Z zyT{H7c3NtH$t%Q|t5(JN3Y#!(I2eh}`z9wf_MHsqWnPCKCin2#hr#)h4b>b?Z=Da6 z1#bA;tL1l1^e!#>OxyM`I5Om{W#d4>#;KwW^xW>rP zh-@IqipbQ6L+fDD4>?{gMT5=))e=R9SYI-_>gqPOu<= z!!S#fM++}+ge1P8RJqmxg4rV90+fFIOcoK~XC04F9-TWf_-YXqSD|A?zP+^Yk|8Ssba;AqM!~Bc5<^Cak0A3yk3P?9+<<-O z&Qg+)kT`mqKho!_Cw&aUyYA(5hy-~&Ezq2x;4=}R4dd!YFoo2pPCE@8@#lyPUJdrC zG)lm1DY`Vw``DsAXZLsm!PCtT-lmuDaQ)U&)I66YzmjkV|9slSU_6JG-=w#THKQ^R z{~>?hoP^3h$6OR_gtJdMA14Th_xC3`_d#2B+ON29i-9F%NHj54@~-dxw@yBpRk0bi z{{SCxb!79~9#C&1_67KO99X{)e7>8I^n)xb>G=u)@G?s&zEOv+;(g|0^fg6V>~2?_ zED@_$D{EVMhDyV@^(Otxmt7M|!j9RY>;sS{P(r;#12Z}4{P@>b_CE6m3RHa;V(wV* z9)ZKf39d3k%nIB~y=+CU?!Ns$?VR~HlzSY~GRii_Xf(ODxUx+XC6uM8ETghS3uXutGh|6d4z6Wr+|eKsbxD)8xS>s! z$^FjXaqsif=Q-#7ob&yj?{m&`&gc2OUUcz;Z6=D_aWl_ATl;$5qM}h}jEQKTu~U#! zJDv9mYzb5JAkjgrRU@$W9X*5;29?9^-=7=gJuuGcBzQo|$uI{bwvc(wOcywdH-554*GBmTZpgn8uh|?MY9A#YO~~ zMpsd|5}7>&d4Wa8{FH=qTi+boIQP-3)74FH);*LdRpSvq;#$XG=Q-n(=bu;=8^nI+ zBd`g@0pr61LrX^U^U2L5@gzUDy>VNoZU=@oln{p^ag-605%>qRRM`Y*?$K}!=UaUR zahf${S5DN`MO=&w9w|~nCmcrDYUt~g!qTu_DEdXg%B}RP-Oc<|1Z6`|y7mXZ&03%I z)0mx-)$N%JA%EddHjC{q)|B0$GpgN1t07}C%DVB4O=QJnwSGmOe@6X|cinP-3YwbS z=gnSrJRF!a`8?_>uFl&hJHIRI@@G!N&E-|GK{jGr${u`fFWi{65BdEXr|a;tF&CyY#2Mm=gL_9-A{vaYmDa%DG_@rfr#iTymC7)*@j<1h z43DnE@k@{8{4E_fy&3&!km?v2Y<}P@YZEkC9IPiK2Wj^QsmuUsSjmi58*fnn$@eoY z>6a2wJ!D06-Xb0?F+c)sd6~Z(Q#Vd7owlD6F=4lDiy5YN=6KEE%zDRP7>TF_JvC$< z8h5mHM4EOkjLfz)3z<cYY6~&2fA455GwdruLBEbN_Laf>%!=sht4*In!UJK5VKGo8Z)x4=zqwM{qvwg zWZPRCvpFI6>zt`kxcen~5z6KJhzY6=8vQ;`7QPx~_&gx_*NOXX=PZOh`?O#37U+RY z+ce^OX1%vvS&5i`mTKYR(b3nd11GbSt`}P8Pn!BHJ6CmcZ$)%|(Fo_q3%?Q^40;RZ z(#A#wE{%=oqy#1(CbefpQ@nM_rq_cT<^E>vXot)QOzC3#4{CbJ*L2#!7h$v^ftYP5CxxxZ96KYGu0dWpNB=bmBV2GX|f*vmc zk<2Ov=uriru#l1lF0LTM0 Generator[NeptuneHook, None, None]: + """Returns a NeptuneHook mocked with moto""" + with mock_neptune(): + yield NeptuneHook(aws_conn_id="aws_default") + + +@pytest.fixture +def neptune_cluster_id(neptune_hook: NeptuneHook) -> str: + """Returns Neptune cluster ID""" + resp = neptune_hook.conn.create_db_cluster( + DBClusterIdentifier="test-cluster", + Engine="neptune", + ) + + return resp["DBCluster"]["DBClusterIdentifier"] + + +class TestNeptuneHook: + def test_get_conn_returns_a_boto3_connection(self): + hook = NeptuneHook(aws_conn_id="aws_default") + assert hook.get_conn() is not None + + def test_get_cluster_status(self, neptune_hook: NeptuneHook, neptune_cluster_id): + assert neptune_hook.get_cluster_status(neptune_cluster_id) is not None diff --git a/tests/providers/amazon/aws/operators/test_neptune.py b/tests/providers/amazon/aws/operators/test_neptune.py new file mode 100644 index 0000000000000..af7dc289d445a --- /dev/null +++ b/tests/providers/amazon/aws/operators/test_neptune.py @@ -0,0 +1,152 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import Generator +from unittest import mock + +import pytest +from moto import mock_neptune + +from airflow.providers.amazon.aws.hooks.neptune import NeptuneHook +from airflow.providers.amazon.aws.operators.neptune import ( + NeptuneStartDbClusterOperator, + NeptuneStopDbClusterOperator, +) + +CLUSTER_ID = "test_cluster" + +EXPECTED_RESPONSE = {"db_cluster_id": CLUSTER_ID} + + +@pytest.fixture +def hook() -> Generator[NeptuneHook, None, None]: + with mock_neptune(): + yield NeptuneHook(aws_conn_id="aws_default") + + +@pytest.fixture +def _create_cluster(hook: NeptuneHook): + hook.conn.create_db_cluster( + DBClusterIdentifier=CLUSTER_ID, + Engine="neptune", + ) + if not hook.conn.describe_db_clusters()["DBClusters"]: + raise ValueError("AWS not properly mocked") + + +class TestNeptuneStartClusterOperator: + @mock.patch.object(NeptuneHook, "conn") + @mock.patch.object(NeptuneHook, "get_waiter") + def test_start_cluster_wait_for_completion(self, mock_hook_get_waiter, mock_conn): + operator = NeptuneStartDbClusterOperator( + task_id="task_test", + db_cluster_id=CLUSTER_ID, + deferrable=False, + wait_for_completion=True, + aws_conn_id="aws_default", + ) + + resp = operator.execute(None) + mock_hook_get_waiter.assert_called_once_with("cluster_available") + assert resp == EXPECTED_RESPONSE + + @mock.patch.object(NeptuneHook, "conn") + @mock.patch.object(NeptuneHook, "get_waiter") + def test_start_cluster_no_wait(self, mock_hook_get_waiter, mock_conn): + operator = NeptuneStartDbClusterOperator( + task_id="task_test", + db_cluster_id=CLUSTER_ID, + deferrable=False, + wait_for_completion=False, + aws_conn_id="aws_default", + ) + + resp = operator.execute(None) + mock_hook_get_waiter.assert_not_called() + assert resp == EXPECTED_RESPONSE + + @mock.patch.object(NeptuneHook, "conn") + @mock.patch.object(NeptuneHook, "get_cluster_status") + @mock.patch.object(NeptuneHook, "get_waiter") + def test_start_cluster_cluster_available(self, mock_waiter, mock_get_cluster_status, mock_conn): + mock_get_cluster_status.return_value = "available" + operator = NeptuneStartDbClusterOperator( + task_id="task_test", + db_cluster_id=CLUSTER_ID, + deferrable=False, + wait_for_completion=True, + aws_conn_id="aws_default", + ) + + resp = operator.execute(None) + + mock_conn.start_db_cluster.assert_not_called() + mock_waiter.assert_not_called() + assert resp == {"db_cluster_id": CLUSTER_ID} + + +class TestNeptuneStopClusterOperator: + @mock.patch.object(NeptuneHook, "conn") + @mock.patch.object(NeptuneHook, "get_waiter") + def test_stop_cluster_wait_for_completion(self, mock_hook_get_waiter, mock_conn): + operator = NeptuneStopDbClusterOperator( + task_id="task_test", + db_cluster_id=CLUSTER_ID, + deferrable=False, + wait_for_completion=True, + aws_conn_id="aws_default", + ) + + resp = operator.execute(None) + mock_hook_get_waiter.assert_called_once_with("cluster_stopped") + assert resp == EXPECTED_RESPONSE + + @mock.patch.object(NeptuneHook, "conn") + @mock.patch.object(NeptuneHook, "get_waiter") + def test_stop_cluster_no_wait(self, mock_hook_get_waiter, mock_conn): + operator = NeptuneStopDbClusterOperator( + task_id="task_test", + db_cluster_id=CLUSTER_ID, + deferrable=False, + wait_for_completion=False, + aws_conn_id="aws_default", + ) + + resp = operator.execute(None) + mock_hook_get_waiter.assert_not_called() + assert resp == EXPECTED_RESPONSE + + @mock.patch.object(NeptuneHook, "conn") + @mock.patch.object(NeptuneHook, "get_cluster_status") + @mock.patch.object(NeptuneHook, "get_waiter") + def test_stop_cluster_cluster_stopped(self, mock_waiter, mock_get_cluster_status, mock_conn): + mock_get_cluster_status.return_value = "stopped" + operator = NeptuneStopDbClusterOperator( + task_id="task_test", + db_cluster_id=CLUSTER_ID, + deferrable=False, + wait_for_completion=True, + aws_conn_id="aws_default", + ) + + resp = operator.execute(None) + + mock_conn.stop_db_cluster.assert_not_called() + mock_waiter.assert_not_called() + assert resp == {"db_cluster_id": CLUSTER_ID} diff --git a/tests/providers/amazon/aws/triggers/test_neptune.py b/tests/providers/amazon/aws/triggers/test_neptune.py new file mode 100644 index 0000000000000..3664e1dedd240 --- /dev/null +++ b/tests/providers/amazon/aws/triggers/test_neptune.py @@ -0,0 +1,82 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest import mock +from unittest.mock import AsyncMock + +import pytest + +from airflow.providers.amazon.aws.triggers.neptune import ( + NeptuneClusterAvailableTrigger, + NeptuneClusterStoppedTrigger, +) +from airflow.triggers.base import TriggerEvent + +CLUSTER_ID = "test-cluster" + + +class TestNeptuneClusterAvailableTrigger: + def test_serialization(self): + """ + Asserts that the TaskStateTrigger correctly serializes its arguments + and classpath. + """ + trigger = NeptuneClusterAvailableTrigger(db_cluster_id=CLUSTER_ID) + classpath, kwargs = trigger.serialize() + assert classpath == "airflow.providers.amazon.aws.triggers.neptune.NeptuneClusterAvailableTrigger" + assert "db_cluster_id" in kwargs + assert kwargs["db_cluster_id"] == CLUSTER_ID + + @pytest.mark.asyncio + @mock.patch("airflow.providers.amazon.aws.hooks.neptune.NeptuneHook.get_waiter") + @mock.patch("airflow.providers.amazon.aws.hooks.neptune.NeptuneHook.async_conn") + async def test_run_success(self, mock_async_conn, mock_get_waiter): + mock_async_conn.__aenter__.return_value = "available" + mock_get_waiter().wait = AsyncMock() + trigger = NeptuneClusterAvailableTrigger(db_cluster_id=CLUSTER_ID) + generator = trigger.run() + resp = await generator.asend(None) + + assert resp == TriggerEvent({"status": "success", "db_cluster_id": CLUSTER_ID}) + assert mock_get_waiter().wait.call_count == 1 + + +class TestNeptuneClusterStoppedTrigger: + def test_serialization(self): + """ + Asserts that the TaskStateTrigger correctly serializes its arguments + and classpath. + """ + trigger = NeptuneClusterStoppedTrigger(db_cluster_id=CLUSTER_ID) + classpath, kwargs = trigger.serialize() + assert classpath == "airflow.providers.amazon.aws.triggers.neptune.NeptuneClusterStoppedTrigger" + assert "db_cluster_id" in kwargs + assert kwargs["db_cluster_id"] == CLUSTER_ID + + @pytest.mark.asyncio + @mock.patch("airflow.providers.amazon.aws.hooks.neptune.NeptuneHook.get_waiter") + @mock.patch("airflow.providers.amazon.aws.hooks.neptune.NeptuneHook.async_conn") + async def test_run_success(self, mock_async_conn, mock_get_waiter): + mock_async_conn.__aenter__.return_value = "stopped" + mock_get_waiter().wait = AsyncMock() + trigger = NeptuneClusterStoppedTrigger(db_cluster_id=CLUSTER_ID) + generator = trigger.run() + resp = await generator.asend(None) + + assert resp == TriggerEvent({"status": "success", "db_cluster_id": CLUSTER_ID}) + assert mock_get_waiter().wait.call_count == 1 diff --git a/tests/providers/amazon/aws/waiters/test_neptune.py b/tests/providers/amazon/aws/waiters/test_neptune.py new file mode 100644 index 0000000000000..118690e6c9d84 --- /dev/null +++ b/tests/providers/amazon/aws/waiters/test_neptune.py @@ -0,0 +1,89 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from unittest import mock + +import boto3 +import botocore +import pytest + +from airflow.providers.amazon.aws.hooks.neptune import NeptuneHook + + +class TestCustomNeptuneWaiters: + """Test waiters from ``amazon/aws/waiters/neptune.json``.""" + + @pytest.fixture(autouse=True) + def setup_test_cases(self, monkeypatch): + self.client = boto3.client("neptune", region_name="eu-west-3") + monkeypatch.setattr(NeptuneHook, "conn", self.client) + + def test_service_waiters(self): + hook_waiters = NeptuneHook(aws_conn_id=None).list_waiters() + assert "cluster_available" in hook_waiters + + @pytest.fixture() + def mock_describe_clusters(self): + with mock.patch.object(self.client, "describe_db_clusters") as m: + yield m + + @staticmethod + def get_status_response(status): + return {"DBClusters": [{"Status": status}]} + + def test_cluster_available(self, mock_describe_clusters): + mock_describe_clusters.return_value = {"DBClusters": [{"Status": "available"}]} + waiter = NeptuneHook(aws_conn_id=None).get_waiter("cluster_available") + waiter.wait(DBClusterIdentifier="test_cluster") + + def test_cluster_available_failed(self, mock_describe_clusters): + with pytest.raises(botocore.exceptions.WaiterError): + mock_describe_clusters.return_value = {"DBClusters": [{"Status": "migration-failed"}]} + waiter = NeptuneHook(aws_conn_id=None).get_waiter("cluster_available") + waiter.wait(DBClusterIdentifier="test_cluster") + + def test_starting_up(self, mock_describe_clusters): + """Test job succeeded""" + mock_describe_clusters.side_effect = [ + self.get_status_response("stopped"), + self.get_status_response("starting"), + self.get_status_response("available"), + ] + waiter = NeptuneHook(aws_conn_id=None).get_waiter("cluster_available") + waiter.wait(cluster_identifier="test_cluster", WaiterConfig={"Delay": 0.2, "MaxAttempts": 4}) + + def test_cluster_stopped(self, mock_describe_clusters): + mock_describe_clusters.return_value = {"DBClusters": [{"Status": "stopped"}]} + waiter = NeptuneHook(aws_conn_id=None).get_waiter("cluster_stopped") + waiter.wait(DBClusterIdentifier="test_cluster") + + def test_cluster_stopped_failed(self, mock_describe_clusters): + with pytest.raises(botocore.exceptions.WaiterError): + mock_describe_clusters.return_value = {"DBClusters": [{"Status": "migration-failed"}]} + waiter = NeptuneHook(aws_conn_id=None).get_waiter("cluster_stopped") + waiter.wait(DBClusterIdentifier="test_cluster") + + def test_stopping(self, mock_describe_clusters): + mock_describe_clusters.side_effect = [ + self.get_status_response("available"), + self.get_status_response("stopping"), + self.get_status_response("stopped"), + ] + waiter = NeptuneHook(aws_conn_id=None).get_waiter("cluster_stopped") + waiter.wait(cluster_identifier="test_cluster", WaiterConfig={"Delay": 0.2, "MaxAttempts": 4}) diff --git a/tests/system/providers/amazon/aws/example_neptune.py b/tests/system/providers/amazon/aws/example_neptune.py new file mode 100644 index 0000000000000..fc9d4226b5f00 --- /dev/null +++ b/tests/system/providers/amazon/aws/example_neptune.py @@ -0,0 +1,68 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import pendulum + +from airflow.models.baseoperator import chain +from airflow.models.dag import DAG +from airflow.providers.amazon.aws.operators.neptune import ( + NeptuneStartDbClusterOperator, + NeptuneStopDbClusterOperator, +) +from tests.system.providers.amazon.aws.utils import SystemTestContextBuilder + +DAG_ID = "example_neptune" +# This test requires an existing Neptune cluster. +CLUSTER_ID = "CLUSTER_ID" + +sys_test_context_task = SystemTestContextBuilder().add_variable(CLUSTER_ID).build() + +with DAG(DAG_ID, schedule="@once", start_date=pendulum.datetime(2024, 1, 1, tz="UTC"), catchup=False) as dag: + test_context = sys_test_context_task() + env_id = test_context["ENV_ID"] + cluster_id = test_context["CLUSTER_ID"] + + # [START howto_operator_start_neptune_cluster] + start_cluster = NeptuneStartDbClusterOperator( + task_id="start_task", db_cluster_id=cluster_id, deferrable=True + ) + # [END howto_operator_start_neptune_cluster] + + # [START howto_operator_stop_neptune_cluster] + stop_cluster = NeptuneStopDbClusterOperator( + task_id="stop_task", db_cluster_id=cluster_id, deferrable=True + ) + # [END howto_operator_stop_neptune_cluster] + + chain( + # TEST SETUP + test_context, + # TEST BODY + start_cluster, + stop_cluster, + ) + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag)