Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GlueJobOperator stuck in running state, even when the job is completed on AWS, when Verbose=True #44694

Open
1 of 2 tasks
rawwar opened this issue Dec 5, 2024 · 7 comments
Open
1 of 2 tasks
Labels
area:providers good first issue kind:bug This is a clearly a bug provider:amazon AWS/Amazon - related issues

Comments

@rawwar
Copy link
Collaborator

rawwar commented Dec 5, 2024

Apache Airflow Provider(s)

amazon

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==9.1.0

Apache Airflow version

2.10.3

Operating System

ubuntu-22.04

Deployment

Astronomer

Deployment details

No response

What happened

GlueJobOperator was stuck in running for a long time, while the actual Glue job on AWS took a minute to complete. This is only happening when Verbose is set to Truehappens

What you think should happen instead

It should not get stuck for a long time when Verbose is set to True

How to reproduce

I used the following DAG Code:

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from datetime import timedelta

from datetime import datetime


def _start():
    print("Hi")


def _end():
    print("Job end")


with DAG("dag_glue_script_python", catchup=False) as dag:
    start = PythonOperator(task_id="start", python_callable=_start)
    
    start_glue_job = GlueJobOperator(
        job_name='sleep2',
        task_id='run',
        aws_conn_id='aws_cre',
        create_job_kwargs={"NumberOfWorkers": 1, "WorkerType": "G.1X"},
        stop_job_run_on_kill=True,
        verbose=True,
        wait_for_completion=True,
        deferrable=True,
        job_poll_interval=15
    )

    end = PythonOperator(task_id="end", python_callable=_end)

start >> start_glue_job >> end

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@rawwar rawwar added kind:bug This is a clearly a bug area:providers needs-triage label for new issues that we didn't triage yet labels Dec 5, 2024
@dosubot dosubot bot added the provider:amazon AWS/Amazon - related issues label Dec 5, 2024
@rawwar
Copy link
Collaborator Author

rawwar commented Dec 5, 2024

I modified the glue.py(in aws provider 9.1.0) hook's print_job_logs method and added a few print statements. Here's the updated glue.py and the task logs:

def print_job_logs(
            self,
            job_name: str,
            run_id: str,
            continuation_tokens: LogContinuationTokens,
        ):
            """
            Print the latest job logs to the Airflow task log and updates the continuation tokens.

            :param continuation_tokens: the tokens where to resume from when reading logs.
                The object gets updated with the new tokens by this method.
            """
            log_client = self.logs_hook.get_conn()
            paginator = log_client.get_paginator("filter_log_events")
            

            def display_logs_from(log_group: str, continuation_token: str | None) -> str | None:
                """Mutualize iteration over the 2 different log streams glue jobs write to."""
                print(f"display_logs_from start with log_group={log_group}, continuation_token={continuation_token}")
                fetched_logs = []
                next_token = continuation_token
                try:
                    for response in paginator.paginate(
                        logGroupName=log_group,
                        logStreamNames=[run_id],
                        PaginationConfig={"StartingToken": continuation_token},
                    ):
                        print("paginator response", response)
                        fetched_logs.extend([event["message"] for event in response["events"]])
                        # if the response is empty there is no nextToken in it
                        next_token = response.get("nextToken") or next_token
                        print("fetched_logs", fetched_logs)
                        print("next_token", next_token)
                except ClientError as e:
                    if e.response["Error"]["Code"] == "ResourceNotFoundException":
                        # we land here when the log groups/streams don't exist yet
                        self.log.warning(
                            "No new Glue driver logs so far.\n"
                            "If this persists, check the CloudWatch dashboard at: %r.",
                            f"https://{self.conn_region_name}.console.aws.amazon.com/cloudwatch/home",
                        )
                    else:
                        print("error", e)
                        raise
                print("finished paginator")
                if len(fetched_logs):
                    # Add a tab to indent those logs and distinguish them from airflow logs.
                    # Log lines returned already contain a newline character at the end.
                    messages = "\t".join(fetched_logs)
                    self.log.info("Glue Job Run %s Logs:\n\t%s", log_group, messages)
                else:
                    self.log.info("No new log from the Glue Job in %s", log_group)
                return next_token

            log_group_prefix = self.conn.get_job_run(JobName=job_name, RunId=run_id)["JobRun"]["LogGroupName"]
            log_group_default = f"{log_group_prefix}/{DEFAULT_LOG_SUFFIX}"
            log_group_error = f"{log_group_prefix}/{ERROR_LOG_SUFFIX}"
            print(f"log_group_prefix={log_group_prefix}, log_group_default={log_group_default}, log_group_error={log_group_error}")
            # one would think that the error log group would contain only errors, but it actually contains
            # a lot of interesting logs too, so it's valuable to have both
            print("before display_logs_from")
            continuation_tokens.output_stream_continuation = display_logs_from(
                log_group_default, continuation_tokens.output_stream_continuation
            )
            print("After")
            continuation_tokens.error_stream_continuation = display_logs_from(
                log_group_error, continuation_tokens.error_stream_continuation
            )
            print("Done")

Task logs are attached
task logs.log

@rawwar
Copy link
Collaborator Author

rawwar commented Dec 5, 2024

Issue seems to be that, paginate.paginate kept going in a loop until it ultimately failed

@eladkal
Copy link
Contributor

eladkal commented Dec 5, 2024

Intresting timing. there was a fix about verbose for GlueJobTrigger #43622

@eladkal eladkal added good first issue and removed needs-triage label for new issues that we didn't triage yet labels Dec 5, 2024
@rawwar
Copy link
Collaborator Author

rawwar commented Dec 5, 2024

Intresting timing. there was a fix about verbose for GlueJobTrigger #43622

@eladkal, I don't think the fix in #43622 is relevant here(Or maybe you were just mentioning about it). Issue happens in both deferrable and non-deferrable mode. Also, I noticed that this issue happens only with one of our customers' AWS accounts. I checked with my personal AWS account with exact permissions, and I can't replicate it. This makes me think there's some edge case in Boto3's paginate, which we need to handle in the provider code.

@ferruzzi
Copy link
Contributor

ferruzzi commented Jan 4, 2025

I can't seem to reproduce this on my laptop either, but would adding a call to get_job_state here do the trick? Something along the lines of:

if len(fetched_logs):
    # Add a tab to indent those logs and distinguish them from airflow logs.
    # Log lines returned already contain a newline character at the end.
    messages = "\t".join(fetched_logs)
    self.log.info("Glue Job Run %s Logs:\n\t%s", log_group, messages)

elif self.get_job_state(job_name, run_id) in ["FAILED", "TIMEOUT", "SUCCEEDED", "STOPPED"]:  
   # no new logs and the job has terminated
    return

else:
  # no new logs but job isn't finished, print a "waiting..." message
  self.log.info("No new log from the Glue Job in %s", log_group)

@rawwar
Copy link
Collaborator Author

rawwar commented Jan 4, 2025

@ferruzzi , I'll give this a try and give you an update.

@rawwar
Copy link
Collaborator Author

rawwar commented Jan 22, 2025

logs.log
@ferruzzi, the Job state check did not help. It's the same issue. It keeps fetching new tokens repeatedly. I'll share the logs in some time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers good first issue kind:bug This is a clearly a bug provider:amazon AWS/Amazon - related issues
Projects
None yet
Development

No branches or pull requests

3 participants