Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify KPO to log container log periodically #37279

Merged
merged 7 commits into from
Feb 12, 2024

Conversation

pankajastro
Copy link
Member

The current state of the KubernetesPodOperator (KPO)
only prints container logs at the end of task execution.
While this is sufficient for short-running tasks,
it becomes less user-friendly when the container runs for an extended period.
This PR enhances the KPO by modifying the trigger and operator
to fetch container logs periodically
making it possible to monitor the task's progress in the Airflow task UI.

a new parameter have been introduced to the operator:

  • logging_interval: This parameter specifies the maximum time,
    in seconds, that the task should remain deferred before resuming to fetch the latest logs.

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@boring-cyborg boring-cyborg bot added area:providers provider:cncf-kubernetes Kubernetes provider related issues labels Feb 9, 2024
@pankajastro pankajastro marked this pull request as ready for review February 12, 2024 04:44
Copy link
Member

@pankajkoti pankajkoti left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. CI is reporting 1 failure for mypy.

@pankajkoti
Copy link
Member

The only failed test in the previous CI run has passed now: https://github.com/apache/airflow/actions/runs/7868578876/job/21466117939#step:24:1.

I think we're good to merge @pankajastro

@pankajastro pankajastro merged commit 053485b into apache:main Feb 12, 2024
44 checks passed
@pankajastro pankajastro deleted the add_trigger_re-etnry branch February 12, 2024 07:58
)
if self.do_xcom_push:
return result

def execute_complete(self, context: Context, event: dict, **kwargs):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how this method is used and where?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pankajastro can you please take a look? With the use of the new method trigger_reentry, if execute_complete is no longer getting used anywhere, perhaps we could remove the method and clean this up?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it was kept for b/c.

I prefer to not include this change in the current providers wave if possible to have the time to test it. WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @hussein-awala

Actually, would be nice for us to include this in the providers wave as we are depending on the change 🙂 .
We tested this on our internal tests and they worked fine. How about we test this with the RC? Would that time be sufficient if we include this? We can work on providing fixes with utmost priority if a bug is identified. or You see some other risks?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback. Yes, this method is public so we can't remove it. similarly, a couple of methods in the trigger are unused now I kept them as well because they might be used by someone.

We have conducted some testing on our end. Once RC is available, we will perform additional tests. Hopefully, some community members will also test RC so we should be able find and fix if something unusual happend before releasing it.

@vchiapaikeo
Copy link
Contributor

@pankajastro , @pankajkoti - this is causing failures for deferrable tasks that should complete. I reverted this commit and things work normally after. Hoping you can help figure out the issue or revert this before the providers push.

Sample DAG:

from airflow import DAG

from airflow.providers.google.cloud.operators.kubernetes_engine import (
    GKEStartPodOperator,
)


DEFAULT_TASK_ARGS = {
    "owner": "gcp-data-platform",
    "start_date": "2021-04-20",
    "retries": 0,
    "retry_delay": 60,
}

with DAG(
    dag_id="test_gke_op",
    schedule_interval="@daily",
    max_active_runs=1,
    max_active_tasks=5,
    catchup=False,
    default_args=DEFAULT_TASK_ARGS,
) as dag:

    _ = GKEStartPodOperator(
        task_id="whoami",
        name="whoami",
        cmds=["gcloud"],
        arguments=["auth", "list"],
        image="gcr.io/google.com/cloudsdktool/cloud-sdk:slim",
        project_id="redacted-project-id",
        namespace="airflow-default",
        location="us-central1",
        cluster_name="airflow-gke-cluster",
        service_account_name="default",
        deferrable=True,
        # do_xcom_push=True,
    )

    _ = GKEStartPodOperator(
        task_id="fail",
        name="fail",
        cmds=["bash"],
        arguments=["-xc", "sleep 2 && exit 1"],
        image="gcr.io/google.com/cloudsdktool/cloud-sdk:slim",
        project_id="redacted-project-id",
        namespace="airflow-default",
        location="us-central1",
        cluster_name="airflow-gke-cluster",
        service_account_name="default",
        deferrable=True,
        # do_xcom_push=True,
    )

Expected result (obtained after reverting):

image

Unexpected result (after rebasing - whoami should succeed):

image

whoami logs:

fcd7bd221fe9
*** Found local files:
***   * /root/airflow/logs/dag_id=test_gke_op/run_id=scheduled__2024-02-11T00:00:00+00:00/task_id=whoami/attempt=11.log
***   * /root/airflow/logs/dag_id=test_gke_op/run_id=scheduled__2024-02-11T00:00:00+00:00/task_id=whoami/attempt=11.log.trigger.40.log
[2024-02-12, 09:12:23 EST] {taskinstance.py:1994} INFO - Dependencies all met for dep_context=non-requeueable deps ti=
[2024-02-12, 09:12:23 EST] {taskinstance.py:1994} INFO - Dependencies all met for dep_context=requeueable deps ti=
[2024-02-12, 09:12:23 EST] {taskinstance.py:2208} INFO - Starting attempt 11 of 11
[2024-02-12, 09:12:23 EST] {taskinstance.py:2229} INFO - Executing  on 2024-02-11 00:00:00+00:00
[2024-02-12, 09:12:23 EST] {standard_task_runner.py:60} INFO - Started process 579 to run task
[2024-02-12, 09:12:23 EST] {standard_task_runner.py:87} INFO - Running: ['***', 'tasks', 'run', 'test_gke_op', 'whoami', 'scheduled__2024-02-11T00:00:00+00:00', '--job-id', '42', '--raw', '--subdir', 'DAGS_FOLDER/test_deferrable_xcom.py', '--cfg-path', '/tmp/tmpkupoc4wd']
[2024-02-12, 09:12:23 EST] {standard_task_runner.py:88} INFO - Job 42: Subtask whoami
[2024-02-12, 09:12:23 EST] {task_command.py:423} INFO - Running  on host fcd7bd221fe9
[2024-02-12, 09:12:23 EST] {taskinstance.py:2529} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='gcp-data-platform' AIRFLOW_CTX_DAG_ID='test_gke_op' AIRFLOW_CTX_TASK_ID='whoami' AIRFLOW_CTX_EXECUTION_DATE='2024-02-11T00:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='11' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-02-11T00:00:00+00:00'
[2024-02-12, 09:12:23 EST] {connection.py:269} WARNING - Connection schemes (type: google_cloud_platform) shall not contain '_' according to RFC3986.
[2024-02-12, 09:12:23 EST] {base.py:83} INFO - Using connection ID 'google_cloud_default' for task execution.
[2024-02-12, 09:12:23 EST] {kubernetes_engine.py:289} INFO - Fetching cluster (project_id=redacted-project-id, location=us-central1, cluster_name=***-gke-cluster)
[2024-02-12, 09:12:23 EST] {credentials_provider.py:353} INFO - Getting connection using `google.auth.default()` since no explicit credentials are provided.
[2024-02-12, 09:12:24 EST] {pod.py:1079} INFO - Building pod whoami-75olgaah with labels: {'dag_id': 'test_gke_op', 'task_id': 'whoami', 'run_id': 'scheduled__2024-02-11T0000000000-2e3e3ab6f', 'kubernetes_pod_operator': 'True', 'try_number': '11'}
[2024-02-12, 09:12:24 EST] {connection.py:269} WARNING - Connection schemes (type: google_cloud_platform) shall not contain '_' according to RFC3986.
[2024-02-12, 09:12:24 EST] {base.py:83} INFO - Using connection ID 'google_cloud_default' for task execution.
[2024-02-12, 09:12:24 EST] {credentials_provider.py:353} INFO - Getting connection using `google.auth.default()` since no explicit credentials are provided.
[2024-02-12, 09:12:25 EST] {taskinstance.py:2382} INFO - Pausing task as DEFERRED. dag_id=test_gke_op, task_id=whoami, execution_date=20240211T000000, start_date=20240212T141223
[2024-02-12, 09:12:25 EST] {local_task_job_runner.py:231} INFO - Task exited with return code 100 (task deferral)
[2024-02-12, 09:12:26 EST] {pod.py:160} INFO - Checking pod 'whoami-75olgaah' in namespace '***-default'.
[2024-02-12, 09:12:26 EST] {connection.py:269} WARNING - Connection schemes (type: google_cloud_platform) shall not contain '_' according to RFC3986.
[2024-02-12, 09:12:26 EST] {base.py:83} INFO - Using connection ID 'google_cloud_default' for task execution.
[2024-02-12, 09:12:29 EST] {triggerer_job_runner.py:604} INFO - Trigger test_gke_op/scheduled__2024-02-11T00:00:00+00:00/whoami/-1/11 (ID 15) fired: TriggerEvent<{'status': 'done', 'namespace': 'airflow-default', 'pod_name': 'whoami-75olgaah'}>
[2024-02-12, 09:12:31 EST] {taskinstance.py:1994} INFO - Dependencies all met for dep_context=non-requeueable deps ti=
[2024-02-12, 09:12:31 EST] {taskinstance.py:1994} INFO - Dependencies all met for dep_context=requeueable deps ti=
[2024-02-12, 09:12:31 EST] {taskinstance.py:2206} INFO - Resuming after deferral
[2024-02-12, 09:12:31 EST] {taskinstance.py:2229} INFO - Executing  on 2024-02-11 00:00:00+00:00
[2024-02-12, 09:12:31 EST] {standard_task_runner.py:60} INFO - Started process 649 to run task
[2024-02-12, 09:12:31 EST] {standard_task_runner.py:87} INFO - Running: ['***', 'tasks', 'run', 'test_gke_op', 'whoami', 'scheduled__2024-02-11T00:00:00+00:00', '--job-id', '43', '--raw', '--subdir', 'DAGS_FOLDER/test_deferrable_xcom.py', '--cfg-path', '/tmp/tmpbojzr3x_']
[2024-02-12, 09:12:31 EST] {standard_task_runner.py:88} INFO - Job 43: Subtask whoami
[2024-02-12, 09:12:31 EST] {task_command.py:423} INFO - Running  on host fcd7bd221fe9
[2024-02-12, 09:12:31 EST] {connection.py:269} WARNING - Connection schemes (type: google_cloud_platform) shall not contain '_' according to RFC3986.
[2024-02-12, 09:12:31 EST] {base.py:83} INFO - Using connection ID 'google_cloud_default' for task execution.
[2024-02-12, 09:12:31 EST] {credentials_provider.py:353} INFO - Getting connection using `google.auth.default()` since no explicit credentials are provided.
[2024-02-12, 09:12:34 EST] {taskinstance.py:2751} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/opt/airflow/airflow/providers/cncf/kubernetes/operators/pod.py", line 753, in execute_complete
    event["name"],
KeyError: 'name'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/opt/airflow/airflow/models/taskinstance.py", line 446, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
  File "/opt/airflow/airflow/models/taskinstance.py", line 416, in _execute_callable
    return execute_callable(context=context, **execute_callable_kwargs)
  File "/opt/airflow/airflow/models/baseoperator.py", line 1623, in resume_execution
    return execute_callable(context)
  File "/opt/airflow/airflow/providers/google/cloud/operators/kubernetes_engine.py", line 593, in execute_complete
    return super().execute_complete(context, event, **kwargs)
  File "/opt/airflow/airflow/providers/cncf/kubernetes/operators/pod.py", line 786, in execute_complete
    pod = self.pod_manager.await_pod_completion(pod, istio_enabled, self.base_container_name)
  File "/opt/airflow/airflow/providers/cncf/kubernetes/utils/pod_manager.py", line 611, in await_pod_completion
    remote_pod = self.read_pod(pod)
  File "/usr/local/lib/python3.10/site-packages/tenacity/__init__.py", line 289, in wrapped_f
    return self(f, *args, **kw)
  File "/usr/local/lib/python3.10/site-packages/tenacity/__init__.py", line 379, in __call__
    do = self.iter(retry_state=retry_state)
  File "/usr/local/lib/python3.10/site-packages/tenacity/__init__.py", line 325, in iter
    raise retry_exc.reraise()
  File "/usr/local/lib/python3.10/site-packages/tenacity/__init__.py", line 158, in reraise
    raise self.last_attempt.result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.10/site-packages/tenacity/__init__.py", line 382, in __call__
    result = fn(*args, **kwargs)
  File "/opt/airflow/airflow/providers/cncf/kubernetes/utils/pod_manager.py", line 713, in read_pod
    return self._client.read_namespaced_pod(pod.metadata.name, pod.metadata.namespace)
AttributeError: 'NoneType' object has no attribute 'metadata'
[2024-02-12, 09:12:34 EST] {taskinstance.py:1166} INFO - Marking task as FAILED. dag_id=test_gke_op, task_id=whoami, execution_date=20240211T000000, start_date=20240212T141223, end_date=20240212T141234
[2024-02-12, 09:12:34 EST] {standard_task_runner.py:107} ERROR - Failed to execute job 43 for task whoami ('NoneType' object has no attribute 'metadata'; 649)
[2024-02-12, 09:12:34 EST] {local_task_job_runner.py:234} INFO - Task exited with return code 1
[2024-02-12, 09:12:34 EST] {taskinstance.py:3332} INFO - 0 downstream tasks scheduled from follow-on schedule check

@pankajkoti
Copy link
Member

pankajkoti commented Feb 12, 2024

@vchiapaikeo Would you please offer more help in understanding the issue and reproducibility?

If I am understanding correctly, is it that we are getting an issue with the GKEStartPodOperator around arguments deferrable and do_xcom_push?
Could you please provide details in which case we're observing failures (with what values of this params)?

@vchiapaikeo
Copy link
Contributor

Subclasses of KPO still call the execute_complete method - https://github.com/apache/airflow/blob/main/airflow/providers/google/cloud/operators/kubernetes_engine.py#L584-L593

Is this expected?

@vchiapaikeo
Copy link
Contributor

vchiapaikeo commented Feb 12, 2024

Also, you changed name to pod_name. Did you mean to do that?

image

That most likely explains this:

Traceback (most recent call last):
  File "/opt/airflow/airflow/providers/cncf/kubernetes/operators/pod.py", line 753, in execute_complete
    event["name"],
KeyError: 'name'

@pankajkoti
Copy link
Member

pankajkoti commented Feb 12, 2024

Subclasses of KPO still call the execute_complete method - https://github.com/apache/airflow/blob/main/airflow/providers/google/cloud/operators/kubernetes_engine.py#L584-L593

Is this expected?

Yes, I think that should not break. The existing public methods should continue to work as they were.

I think you've caught the cause for issue in #37279 (comment). Thanks for the help in testing and identifying the issue.

We will fix this

cc: @pankajastro

@pankajastro
Copy link
Member Author

pankajastro commented Feb 12, 2024

hmm, look like then "name" => "pod_name" is issue

@pankajastro
Copy link
Member Author

Hey @vchiapaikeo would it be possible for you to test #37363 with your use case?

pankajastro added a commit to astronomer/airflow that referenced this pull request Feb 15, 2024
…run method

In apache#37279 I introduce periodic logging of the container.
During the process, I also changed a few event Dict key names
and that is problematic for someone extending the KPO trigger.
Also, the current execute_compelete method was unused in the KPO operator
and was problematic if someone using it in an extended class since
now the trigger can also emit an event even if the pod is in the pod intermediate state.
one reported issue: apache#37279 (comment)
In this PR I'm restoring the trigger event dict structure.
Also, deprecating the execute_complete method
potiuk pushed a commit that referenced this pull request Feb 15, 2024
…run method (#37454)

In #37279 I introduce periodic logging of the container.
During the process, I also changed a few event Dict key names
and that is problematic for someone extending the KPO trigger.
Also, the current execute_compelete method was unused in the KPO operator
and was problematic if someone using it in an extended class since
now the trigger can also emit an event even if the pod is in the pod intermediate state.
one reported issue: #37279 (comment)
In this PR I'm restoring the trigger event dict structure.
Also, deprecating the execute_complete method
pankajkoti added a commit to astronomer/airflow that referenced this pull request Feb 18, 2024
I am observing an issue wrt to the recent deferrable KPO changes in
PR apache#37279 and apache#37454,
where when the pod fails to start within a specified timeout value,
the KPO task is hanging forever whereas it is expected to fail after
the timeout. This PR fixes the issue by correcting a logical error
for detecting if elapsed timeout has occured for raising the timeout
trigger event.
pankajastro pushed a commit that referenced this pull request Feb 18, 2024
…#37514)

I am observing an issue wrt to the recent deferrable KPO changes in
PR #37279 and #37454,
where when the pod fails to start within a specified timeout value,
the KPO task is hanging forever whereas it is expected to fail after
the timeout. This PR fixes the issue by correcting a logical error
for detecting if elapsed timeout has occured for raising the timeout
trigger event.
@jonathonbattista
Copy link

Should this already be compatible with the GKEStartPodOperator out of the box or does it also need updated?

I am on apache-airflow-providers-cncf-kubernetes v8.1.1 and apache-airflow-providers-google v10.17.0

I have logging_internval set, but its not polling the logs when the task is deferred:
https://github.com/apache/airflow/blob/main/airflow/providers/google/cloud/operators/kubernetes_engine.py#L792

with DAG(
    "gke_start_pod_test",
    schedule_interval=None,
    start_date=days_ago(1),
    tags=["test"],
) as dag:
  gke_start_pod_test = GKEStartPodOperator(
      task_id="sleep",
      project_id="xxxxx",
      location="xxx",
      cluster_name="xxxxx",
      do_xcom_push=True,
      namespace="default",
      image="ubuntu:jammy",
      cmds=["sh", "-c", "timeout 300 bash -c 'while true; do echo \"meow\"; sleep 3; done'"],
      name="test-sleep",
      in_cluster=False,
      on_finish_action="delete_pod",
      deferrable=True,
      get_logs=True,
      logging_interval=10
  )

@pankajkoti
Copy link
Member

I think it should work out of the box. @pankajastro could you please test this once?

@pankajastro
Copy link
Member Author

hmm, this will require small changes PR #39348

kosteev pushed a commit to GoogleCloudPlatform/composer-airflow that referenced this pull request Jul 19, 2024
… (#37514)

I am observing an issue wrt to the recent deferrable KPO changes in
PR apache/airflow#37279 and apache/airflow#37454,
where when the pod fails to start within a specified timeout value,
the KPO task is hanging forever whereas it is expected to fail after
the timeout. This PR fixes the issue by correcting a logical error
for detecting if elapsed timeout has occured for raising the timeout
trigger event.

GitOrigin-RevId: 6412b06a7b35a0743656dd3b2160f390f40108c2
kosteev pushed a commit to GoogleCloudPlatform/composer-airflow that referenced this pull request Sep 20, 2024
… (#37514)

I am observing an issue wrt to the recent deferrable KPO changes in
PR apache/airflow#37279 and apache/airflow#37454,
where when the pod fails to start within a specified timeout value,
the KPO task is hanging forever whereas it is expected to fail after
the timeout. This PR fixes the issue by correcting a logical error
for detecting if elapsed timeout has occured for raising the timeout
trigger event.

GitOrigin-RevId: 6412b06a7b35a0743656dd3b2160f390f40108c2
kosteev pushed a commit to GoogleCloudPlatform/composer-airflow that referenced this pull request Nov 9, 2024
… (#37514)

I am observing an issue wrt to the recent deferrable KPO changes in
PR apache/airflow#37279 and apache/airflow#37454,
where when the pod fails to start within a specified timeout value,
the KPO task is hanging forever whereas it is expected to fail after
the timeout. This PR fixes the issue by correcting a logical error
for detecting if elapsed timeout has occured for raising the timeout
trigger event.

GitOrigin-RevId: 6412b06a7b35a0743656dd3b2160f390f40108c2
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers provider:cncf-kubernetes Kubernetes provider related issues
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants