Skip to content

Commit

Permalink
Add EMR Notebook operators (apache#28312)
Browse files Browse the repository at this point in the history
  • Loading branch information
syedahsn authored and gschuurman committed Dec 19, 2022
1 parent cd687ef commit db345b5
Show file tree
Hide file tree
Showing 6 changed files with 763 additions and 7 deletions.
165 changes: 165 additions & 0 deletions airflow/providers/amazon/aws/operators/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,171 @@ def execute(self, context: Context) -> list[str]:
return emr_hook.add_job_flow_steps(job_flow_id=job_flow_id, steps=steps, wait_for_completion=True)


class EmrStartNotebookExecutionOperator(BaseOperator):
"""
An operator that starts an EMR notebook execution.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:EmrStartNotebookExecutionOperator`
:param editor_id: The unique identifier of the EMR notebook to use for notebook execution.
:param relative_path: The path and file name of the notebook file for this execution,
relative to the path specified for the EMR notebook.
:param cluster_id: The unique identifier of the EMR cluster the notebook is attached to.
:param service_role: The name or ARN of the IAM role that is used as the service role
for Amazon EMR (the EMR role) for the notebook execution.
:param notebook_execution_name: Optional name for the notebook execution.
:param notebook_params: Input parameters in JSON format passed to the EMR notebook at
runtime for execution.
:param: notebook_instance_security_group_id: The unique identifier of the Amazon EC2
security group to associate with the EMR notebook for this notebook execution.
:param: master_instance_security_group_id: Optional unique ID of an EC2 security
group to associate with the master instance of the EMR cluster for this notebook execution.
:param tags: Optional list of key value pair to associate with the notebook execution.
:param waiter_countdown: Total amount of time the operator will wait for the notebook to stop.
Defaults to 25 * 60 seconds.
:param waiter_check_interval_seconds: Number of seconds between polling the state of the notebook.
Defaults to 60 seconds.
"""

template_fields: Sequence[str] = (
"editor_id",
"cluster_id",
"relative_path",
"service_role",
"notebook_execution_name",
"notebook_params",
"notebook_instance_security_group_id",
"master_instance_security_group_id",
"tags",
)

def __init__(
self,
editor_id: str,
relative_path: str,
cluster_id: str,
service_role: str,
notebook_execution_name: str | None = None,
notebook_params: str | None = None,
notebook_instance_security_group_id: str | None = None,
master_instance_security_group_id: str | None = None,
tags: list | None = None,
wait_for_completion: bool = False,
aws_conn_id: str = "aws_default",
waiter_countdown: int = 25 * 60,
waiter_check_interval_seconds: int = 60,
**kwargs: Any,
):
super().__init__(**kwargs)
self.editor_id = editor_id
self.relative_path = relative_path
self.service_role = service_role
self.notebook_execution_name = notebook_execution_name or f"emr_notebook_{uuid4()}"
self.notebook_params = notebook_params or ""
self.notebook_instance_security_group_id = notebook_instance_security_group_id or ""
self.tags = tags or []
self.wait_for_completion = wait_for_completion
self.cluster_id = cluster_id
self.aws_conn_id = aws_conn_id
self.waiter_countdown = waiter_countdown
self.waiter_check_interval_seconds = waiter_check_interval_seconds
self.master_instance_security_group_id = master_instance_security_group_id

def execute(self, context: Context):
execution_engine = {
"Id": self.cluster_id,
"Type": "EMR",
"MasterInstanceSecurityGroupId": self.master_instance_security_group_id or "",
}
emr_hook = EmrHook(aws_conn_id=self.aws_conn_id)

response = emr_hook.conn.start_notebook_execution(
EditorId=self.editor_id,
RelativePath=self.relative_path,
NotebookExecutionName=self.notebook_execution_name,
NotebookParams=self.notebook_params,
ExecutionEngine=execution_engine,
ServiceRole=self.service_role,
NotebookInstanceSecurityGroupId=self.notebook_instance_security_group_id,
Tags=self.tags,
)

if response["ResponseMetadata"]["HTTPStatusCode"] != 200:
raise AirflowException(f"Starting notebook execution failed: {response}")

self.log.info("Notebook execution started: %s", response["NotebookExecutionId"])
notebook_execution_id = response["NotebookExecutionId"]
if self.wait_for_completion:
waiter(
get_state_callable=emr_hook.conn.describe_notebook_execution,
get_state_args={"NotebookExecutionId": notebook_execution_id},
parse_response=["NotebookExecution", "Status"],
desired_state={"RUNNING", "FINISHED"},
failure_states={"FAILED"},
object_type="notebook execution",
action="starting",
countdown=self.waiter_countdown,
check_interval_seconds=self.waiter_check_interval_seconds,
)
return notebook_execution_id


class EmrStopNotebookExecutionOperator(BaseOperator):
"""
An operator that stops a running EMR notebook execution.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:EmrStopNotebookExecutionOperator`
:param notebook_execution_id: The unique identifier of the notebook execution.
:param wait_for_completion: If True, the operator will wait for the notebook.
to be in a STOPPED or FINISHED state. Defaults to False.
:param aws_conn_id: aws connection to use.
:param waiter_countdown: Total amount of time the operator will wait for the notebook to stop.
Defaults to 25 * 60 seconds.
:param waiter_check_interval_seconds: Number of seconds between polling the state of the notebook.
Defaults to 60 seconds.
"""

template_fields: Sequence[str] = ("notebook_execution_id",)

def __init__(
self,
notebook_execution_id: str,
wait_for_completion: bool = False,
aws_conn_id: str = "aws_default",
waiter_countdown: int = 25 * 60,
waiter_check_interval_seconds: int = 60,
**kwargs: Any,
):
super().__init__(**kwargs)
self.notebook_execution_id = notebook_execution_id
self.wait_for_completion = wait_for_completion
self.aws_conn_id = aws_conn_id
self.waiter_countdown = waiter_countdown
self.waiter_check_interval_seconds = waiter_check_interval_seconds

def execute(self, context: Context) -> None:
emr_hook = EmrHook(aws_conn_id=self.aws_conn_id)
emr_hook.conn.stop_notebook_execution(NotebookExecutionId=self.notebook_execution_id)

if self.wait_for_completion:
waiter(
get_state_callable=emr_hook.conn.describe_notebook_execution,
get_state_args={"NotebookExecutionId": self.notebook_execution_id},
parse_response=["NotebookExecution", "Status"],
desired_state={"STOPPED", "FINISHED"},
failure_states={"FAILED"},
object_type="notebook execution",
action="stopped",
countdown=self.waiter_countdown,
check_interval_seconds=self.waiter_check_interval_seconds,
)


class EmrEksCreateClusterOperator(BaseOperator):
"""
An operator that creates EMR on EKS virtual clusters.
Expand Down
74 changes: 67 additions & 7 deletions airflow/providers/amazon/aws/sensors/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,7 @@ def poke(self, context: Context):
return True

if state in self.failed_states:
final_message = "EMR job failed"
failure_message = self.failure_message_from_response(response)
if failure_message:
final_message += " " + failure_message
raise AirflowException(final_message)
raise AirflowException(f"EMR job failed: {self.failure_message_from_response(response)}")

return False

Expand All @@ -93,7 +89,7 @@ def get_emr_response(self) -> dict[str, Any]:
@staticmethod
def state_from_response(response: dict[str, Any]) -> str:
"""
Get state from response dictionary.
Get state from boto3 response.
:param response: response from AWS API
:return: state
Expand All @@ -103,7 +99,7 @@ def state_from_response(response: dict[str, Any]) -> str:
@staticmethod
def failure_message_from_response(response: dict[str, Any]) -> str | None:
"""
Get failure message from response dictionary.
Get state from boto3 response.
:param response: response from AWS API
:return: failure message
Expand Down Expand Up @@ -299,6 +295,70 @@ def hook(self) -> EmrContainerHook:
return EmrContainerHook(self.aws_conn_id, virtual_cluster_id=self.virtual_cluster_id)


class EmrNotebookExecutionSensor(EmrBaseSensor):
"""
Polls the state of the EMR notebook execution until it reaches
any of the target states.
If a failure state is reached, the sensor throws an error, and fails the task.
.. seealso::
For more information on how to use this sensor, take a look at the guide:
:ref:`howto/sensor:EmrNotebookExecutionSensor`
:param notebook_execution_id: Unique id of the notebook execution to be poked.
:target_states: the states the sensor will wait for the execution to reach.
Default target_states is ``FINISHED``.
:failed_states: if the execution reaches any of the failed_states, the sensor will fail.
Default failed_states is ``FAILED``.
"""

template_fields: Sequence[str] = ("notebook_execution_id",)

FAILURE_STATES = {"FAILED"}
COMPLETED_STATES = {"FINISHED"}

def __init__(
self,
notebook_execution_id: str,
target_states: Iterable[str] | None = None,
failed_states: Iterable[str] | None = None,
**kwargs,
):
super().__init__(**kwargs)
self.notebook_execution_id = notebook_execution_id
self.target_states = target_states or self.COMPLETED_STATES
self.failed_states = failed_states or self.FAILURE_STATES

def get_emr_response(self) -> dict[str, Any]:
emr_client = self.get_hook().get_conn()
self.log.info("Poking notebook %s", self.notebook_execution_id)

return emr_client.describe_notebook_execution(NotebookExecutionId=self.notebook_execution_id)

@staticmethod
def state_from_response(response: dict[str, Any]) -> str:
"""
Make an API call with boto3 and get cluster-level details.
.. seealso::
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr.html#EMR.Client.describe_cluster
:return: response
"""
return response["NotebookExecution"]["Status"]

@staticmethod
def failure_message_from_response(response: dict[str, Any]) -> str | None:
"""
Get failure message from response dictionary.
:param response: response from AWS API
:return: failure message
"""
cluster_status = response["NotebookExecution"]
return cluster_status.get("LastStateChangeReason", None)


class EmrJobFlowSensor(EmrBaseSensor):
"""
Asks for the state of the EMR JobFlow (Cluster) until it reaches
Expand Down
42 changes: 42 additions & 0 deletions docs/apache-airflow-providers-amazon/operators/emr.rst
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,51 @@ To modify an existing EMR container you can use
:start-after: [START howto_operator_emr_modify_cluster]
:end-before: [END howto_operator_emr_modify_cluster]

.. _howto/operator:EmrStartNotebookExecutionOperator:

Start an EMR notebook execution
====================================

You can use :class:`~airflow.providers.amazon.aws.operators.emr.EmrStartNotebookExecutionOperator` to
start a notebook execution on an existing notebook attached to a running cluster.

.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_emr_notebook_execution.py
:language: python
:dedent: 4
:start-after: [START howto_operator_emr_start_notebook_execution]
:end-before: [END howto_operator_emr_start_notebook_execution]

.. _howto/operator:EmrStopNotebookExecutionOperator:

Stop an EMR notebook execution
====================================

You can use :class:`~airflow.providers.amazon.aws.operators.emr.EmrStopNotebookExecutionOperator` to
stop a running notebook execution.

.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_emr_notebook_execution.py
:language: python
:dedent: 4
:start-after: [START howto_operator_emr_stop_notebook_execution]
:end-before: [END howto_operator_emr_stop_notebook_execution]

Sensors
-------

.. _howto/sensor:EmrNotebookExecutionSensor:

Wait on an EMR notebook execution state
=======================================

To monitor the state of an EMR notebook execution you can use
:class:`~airflow.providers.amazon.aws.sensors.emr.EmrNotebookExecutionSensor`.

.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_emr_notebook_execution.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_emr_notebook_execution]
:end-before: [END howto_sensor_emr_notebook_execution]

.. _howto/sensor:EmrJobFlowSensor:

Wait on an Amazon EMR job flow state
Expand Down
Loading

0 comments on commit db345b5

Please sign in to comment.