Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix AWS EmrStepSensor ignoring the specified aws_conn_id in deferred mode #33952

Merged

Conversation

yermalov-here
Copy link
Contributor

airflow 2.7.0
amazon-provider 8.6.0

Problem
AWS EmrStepSensor in deferred mode falls back to aws_default connection after deferral and ignores the connection id specified.

Root-cause
Internally EmrStepSensorTrigger initializes EmrHook without specifying a value for the aws_conn_id and instead sets the emr_conn_id which is not used by the trigger. Thus after serialization in the Trigerrer the default value for the aws_conn_id is used (which is aws_default).
See the example log in the bottom (Airflow there doesn't have the aws_default connection configured). Note that before deferral it uses the aws connection, but afterwards switches to aws_default.

Scope
This PR

  1. fixes the EmrHook initialization in the EmrStepSensorTrigger
  2. extends the emr triggers serialization test to check the hooks' aws_conn_id value
  3. and makes aws_conn_id assignment explicit for hook initialization in the EmrContainerTrigger to avoid the same problem in case of implementation changes.
[2023-01-01, 13:52:27 UTC] {task_command.py:415} INFO - Running <TaskInstance: dagname.wait_emr_step scheduled__2023-01-01 00:00:00+00:00 [running]> on host INSTANCE_01
[2023-01-01, 13:52:27 UTC] {taskinstance.py:1660} INFO - Exporting env vars: AIRFLOW_CTX_DAG_EMAIL='[email protected]' AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='dagname' AIRFLOW_CTX_TASK_ID='wait_emr_step' AIRFLOW_CTX_EXECUTION_DATE='2023-01-01T00:00:00' AIRFLOW_CTX_TRY_NUMBER='2' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2023-01-01 00:00:00+00:00'
[2023-01-01, 13:52:27 UTC] {base.py:73} INFO - Using connection ID 'aws' for task execution.
[2023-01-01, 13:52:27 UTC] {emr.py:575} INFO - Poking step s-STEPID on cluster j-CLUSTERID
[2023-01-01, 13:52:28 UTC] {emr.py:79} INFO - Job flow currently PENDING
[2023-01-01, 13:52:28 UTC] {taskinstance.py:1526} INFO - Pausing task as DEFERRED. dag_id=dagname, task_id=wait_emr_step, execution_date=20230101T000000, start_date=20230101T000000
[2023-01-01, 13:52:28 UTC] {local_task_job_runner.py:225} INFO - Task exited with return code 100 (task deferral)
[2023-01-01, 13:52:28 UTC] {base_aws.py:566} WARNING - Unable to find AWS Connection ID 'aws_default', switching to empty.
[2023-01-01, 13:52:28 UTC] {base_aws.py:160} INFO - No connection ID provided. Fallback on boto3 credential strategy (region_name=None). See: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html
[2023-01-01, 13:52:40 UTC] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: dagname.wait_emr_step scheduled__2023-01-01 00:00:00+00:00 [queued]>
[2023-01-01, 13:52:40 UTC] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: dagname.wait_emr_step scheduled__2023-01-01 00:00:00+00:00 [queued]>
[2023-01-01, 13:52:40 UTC] {taskinstance.py:1359} INFO - Resuming after deferral
[2023-01-01, 13:52:40 UTC] {taskinstance.py:1382} INFO - Executing <Task(EmrStepSensor): wait_emr_step> on 2023-01-01 00:00:00+00:00
[2023-01-01, 13:52:40 UTC] {standard_task_runner.py:57} INFO - Started process 259 to run task
[2023-01-01, 13:52:40 UTC] {standard_task_runner.py:84} INFO - Running: ['airflow', 'tasks', 'run', 'dagname', 'wait_emr_step', 'scheduled__2023-01-01 00:00:00+00:00', '--job-id', '2661', '--raw', '--subdir', 'DAGS_FOLDER/dag.py', '--cfg-path', '/tmp/tmpbugsw1g7']
[2023-01-01, 13:52:40 UTC] {standard_task_runner.py:85} INFO - Job 2661: Subtask wait_emr_step
[2023-01-01, 13:52:40 UTC] {task_command.py:415} INFO - Running <TaskInstance: dagname.wait_emr_step scheduled__2023-01-01 00:00:00+00:00 [running]> on host INSTANCE_02
[2023-01-01, 13:52:40 UTC] {taskinstance.py:1720} ERROR - Trigger failed:
Traceback (most recent call last):
  File "airflow/jobs/triggerer_job_runner.py", line 527, in cleanup_finished_triggers
    result = details["task"].result()
  File "airflow/jobs/triggerer_job_runner.py", line 599, in run_trigger
    async for event in trigger.run():
  File "airflow/providers/amazon/aws/triggers/base.py", line 118, in run
    async with hook.async_conn as client:
  File "aiobotocore/session.py", line 27, in __aenter__
    self._client = await self._coro
  File "aiobotocore/session.py", line 211, in _create_client
    client = await client_creator.create_client(
  File "aiobotocore/client.py", line 76, in create_client
    client_args = self._get_client_args(
  File "aiobotocore/client.py", line 267, in _get_client_args
    return args_creator.get_client_args(
  File "aiobotocore/args.py", line 31, in get_client_args
    final_args = self.compute_client_args(
  File "botocore/args.py", line 219, in compute_client_args
    endpoint_config = self._compute_endpoint_config(
  File "botocore/args.py", line 368, in _compute_endpoint_config
    return self._resolve_endpoint(**resolve_endpoint_kwargs)
  File "botocore/args.py", line 473, in _resolve_endpoint
    return endpoint_bridge.resolve(
  File "botocore/client.py", line 595, in resolve
    resolved = self.endpoint_resolver.construct_endpoint(
  File "botocore/regions.py", line 229, in construct_endpoint
    result = self._endpoint_for_partition(
  File "botocore/regions.py", line 277, in _endpoint_for_partition
    raise NoRegionError()
botocore.exceptions.NoRegionError: You must specify a region.
[2023-01-01, 13:52:40 UTC] {taskinstance.py:1943} ERROR - Task failed with exception
airflow.exceptions.TaskDeferralError: Trigger failure

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@boring-cyborg boring-cyborg bot added area:providers provider:amazon AWS/Amazon - related issues labels Aug 31, 2023
Comment on lines 244 to +245
def hook(self) -> AwsGenericHook:
return EmrContainerHook(self.aws_conn_id)
return EmrContainerHook(aws_conn_id=self.aws_conn_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch, hooks are mixed with positional/keyword arguments, and this might lead a problem when we create Thick wrapper hook

def __init__(self, emr_conn_id: str | None = default_conn_name, *args, **kwargs) -> None:
self.emr_conn_id = emr_conn_id
kwargs["client_type"] = "emr"
super().__init__(*args, **kwargs)

Comment on lines 244 to +245
def hook(self) -> AwsGenericHook:
return EmrContainerHook(self.aws_conn_id)
return EmrContainerHook(aws_conn_id=self.aws_conn_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vandonr-amz Not related to this PR, but why we decide to use hook as method instead of property here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering. Maybe some serialization issues or something like that

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't remember exactly, maybe it is because it actually builds the hook every time, so it shouldn't be used "like a property".

Copy link
Contributor

@Taragolis Taragolis Aug 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this case we usually decorate it as @cached_property. In general it uses in a lot of places over the different providers, especially when you need to move something heavy from constructor and keep compatible attribute.

Anyway it was just a question, no offence

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, sure we can, but it shouldn't be necessary as we use it only in once place.

@vincbeck vincbeck merged commit 097e3e6 into apache:main Aug 31, 2023
@yermalov-here yermalov-here deleted the fix_amazon_emr_step_sensor_deferred branch August 31, 2023 16:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers provider:amazon AWS/Amazon - related issues
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants