Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deferred Glue Operators seem to always be verbose #43620

Closed
2 tasks done
jimwbaldwin opened this issue Nov 4, 2024 · 10 comments
Closed
2 tasks done

Deferred Glue Operators seem to always be verbose #43620

jimwbaldwin opened this issue Nov 4, 2024 · 10 comments
Labels
area:logging area:providers kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet provider:amazon AWS/Amazon - related issues

Comments

@jimwbaldwin
Copy link
Contributor

jimwbaldwin commented Nov 4, 2024

Apache Airflow Provider(s)

amazon

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==8.28.0

Apache Airflow version

2.10.1

Operating System

Amazon Linux 2023

Deployment

Amazon (AWS) MWAA

Deployment details

MWAA in production and mwaa-local-runner for local testing. Bug exists on both.

What happened

Deferred GlueJobOperator tasks are getting job failures due to rate limiting on fetching logs from Cloudwatch, we have verbose set to False. The traceback shows a function print_job_logs that should only be called when verbose is true. Additionally when I watch the task logs in real time they are definitely pulling the Glue logs from Cloudwatch into Airflow.

Traceback (most recent call last):

  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/jobs/triggerer_job_runner.py", line 558, in cleanup_finished_triggers
    result = details["task"].result()
             ^^^^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/jobs/triggerer_job_runner.py", line 630, in run_trigger
    async for event in trigger.run():

  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/triggers/glue.py", line 73, in run
    await hook.async_job_completion(self.job_name, self.run_id, self.verbose)

  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/hooks/glue.py", line 315, in async_job_completion
    ret = self._handle_state(job_run_state, job_name, run_id, verbose, next_log_tokens)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/hooks/glue.py", line 334, in _handle_state
    self.print_job_logs(

  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/hooks/glue.py", line 278, in print_job_logs
    continuation_tokens.output_stream_continuation = display_logs_from(
                                                     ^^^^^^^^^^^^^^^^^^

  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/hooks/glue.py", line 245, in display_logs_from
    for response in paginator.paginate(

  File "/usr/local/airflow/.local/lib/python3.11/site-packages/botocore/paginate.py", line 269, in __iter__
    response = self._make_request(current_kwargs)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/airflow/.local/lib/python3.11/site-packages/botocore/paginate.py", line 357, in _make_request
    return self._method(**current_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/airflow/.local/lib/python3.11/site-packages/botocore/client.py", line 569, in _api_call
    return self._make_api_call(operation_name, kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/airflow/.local/lib/python3.11/site-packages/botocore/client.py", line 1023, in _make_api_call
    raise error_class(parsed_response, operation_name)

botocore.errorfactory.ThrottlingException: An error occurred (ThrottlingException) when calling the FilterLogEvents operation (reached max retries: 4): Rate exceeded

A colleague and I read through the operator code and it all looks correct, so we suspect that the issues is occurring when the task is serialized as that specifically converts the bool to a string. The string "False" would then be truthy and cause this code to run. I'm guessing that the tasks are serialized when they are deferred.

Serialization:

def serialize(self) -> tuple[str, dict[str, Any]]:
return (
# dynamically generate the fully qualified name of the class
self.__class__.__module__ + "." + self.__class__.__qualname__,
{
"job_name": self.job_name,
"run_id": self.run_id,
"verbose": str(self.verbose),
"aws_conn_id": self.aws_conn_id,
"job_poll_interval": self.job_poll_interval,
},
)

Code which is getting called when verbose is false:

if verbose:
self.print_job_logs(
job_name=job_name,
run_id=run_id,
continuation_tokens=next_log_tokens,
)

Example python showing bool serialization issues:

Python 3.11.7 (main, Oct 26 2024, 04:00:37) [GCC 11.4.1 20230605 (Red Hat 11.4.1-2)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> a = False
>>> str(a)
'False'
>>> bool(str(a))
True
>>>

What you think should happen instead

Cloudwatch logs should not be read into Airflow when verbose is False.

If the serialization process is the issue, then it should either change to serialize differently or the deserialization should be changed to understand "True" and "False".

How to reproduce

  1. Create a GlueJobOperator task.
  2. Set deferrable=True and verbose=False
  3. Run the task and watch the Airflow logs while it is in a deferred state.

Anything else

Seems to be happening everytime if I watch the tasks. Completed tasks (success or failed) do not show the logs but deferred tasks do, which to me backs up some kind of serialization bug in the Operator.

The traceback shows that code is being called when running even though in the log it is not shown.

Would be willing to submit a PR if someone could assist.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@jimwbaldwin jimwbaldwin added area:providers kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Nov 4, 2024
Copy link

boring-cyborg bot commented Nov 4, 2024

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

@dosubot dosubot bot added area:logging provider:amazon AWS/Amazon - related issues labels Nov 4, 2024
@jimwbaldwin
Copy link
Contributor Author

I had a quick look at other serialize functions and they just seem to make dict[str, Any] and don't change the types. Could it be as simple as removing that str?

@jimwbaldwin
Copy link
Contributor Author

There is a test for verbose=True but not verbose=False, as it is always True then this will pass fine.

@mock.patch.object(GlueJobHook, "print_job_logs")
@mock.patch.object(GlueJobHook, "get_job_state")
@mock.patch.object(GlueJobHook, "initialize_job")
@mock.patch.object(GlueJobHook, "get_conn")
@mock.patch.object(S3Hook, "load_file")
def test_execute_with_verbose_logging(
self, mock_load_file, mock_get_conn, mock_initialize_job, mock_get_job_state, mock_print_job_logs
):
glue = GlueJobOperator(
task_id=TASK_ID,
job_name=JOB_NAME,
script_location="s3_uri",
s3_bucket="bucket_name",
iam_role_name="role_arn",
verbose=True,
)
mock_initialize_job.return_value = {"JobRunState": "RUNNING", "JobRunId": JOB_RUN_ID}
mock_get_job_state.return_value = "SUCCEEDED"
glue.execute(mock.MagicMock())
mock_initialize_job.assert_called_once_with({}, {})
mock_print_job_logs.assert_called_once_with(
job_name=JOB_NAME, run_id=JOB_RUN_ID, continuation_tokens=mock.ANY
)
assert glue.job_name == JOB_NAME

@eladkal
Copy link
Contributor

eladkal commented Nov 4, 2024

Feel free to raise PR with your code suggestions (and improving the tests). It will be easier to review and look into it with a PR

@gopidesupavan
Copy link
Member

I didn’t quite understand your question. In your configuration, the deferrable parameter is set to False in the re-produce step (step 2). This means the operator won’t use the deferrable mode; instead, it will execute normally on the worker.

@jimwbaldwin
Copy link
Contributor Author

I didn’t quite understand your question. In your configuration, the deferrable parameter is set to False in the re-produce step (step 2). This means the operator won’t use the deferrable mode; instead, it will execute normally on the worker.

Sorry, I made a mistake typing it up. I'll fix it.

@jimwbaldwin
Copy link
Contributor Author

Fixed in description.

  1. Create a GlueJobOperator task.
  2. Set deferrable=True and verbose=False
  3. Run the task and watch the Airflow logs while it is in a deferred state.

@jimwbaldwin
Copy link
Contributor Author

Heya, I've added a test to this PR which shows the bug.
#43622

@jimwbaldwin
Copy link
Contributor Author

Here is the test showing the problem and it is currently failing.
image

@jimwbaldwin
Copy link
Contributor Author

Should be fixed now by my reference PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:logging area:providers kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet provider:amazon AWS/Amazon - related issues
Projects
None yet
Development

No branches or pull requests

3 participants