Skip to content

Commit

Permalink
Add support logging in KubernetesPodOperator (#42498)
Browse files Browse the repository at this point in the history
This change adds an option to print logs for init containers. The get_init_containers_logs and init_container_logs functions allow displaying the logs of init containers.

Fixes: #42498
  • Loading branch information
mrk-andreev committed Nov 9, 2024
1 parent 63b2bbd commit d23bd7b
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 1 deletion.
32 changes: 32 additions & 0 deletions kubernetes_tests/test_kubernetes_pod_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1326,6 +1326,38 @@ class MyK8SPodOperator(KubernetesPodOperator):
)
assert MyK8SPodOperator(task_id=str(uuid4())).base_container_name == "tomato-sauce"

def test_init_container_logs(self, mock_get_connection):
marker_from_init_container = f"{uuid4()}"
marker_from_main_container = f"{uuid4()}"
progress_callback = MagicMock()
init_container = k8s.V1Container(
name="init-container",
image="busybox",
command=["sh", "-cx"],
args=[f"echo {marker_from_init_container}"],
)
k = KubernetesPodOperator(
namespace="default",
image="busybox",
cmds=["sh", "-cx"],
arguments=[f"echo {marker_from_main_container}"],
labels=self.labels,
task_id=str(uuid4()),
in_cluster=False,
do_xcom_push=False,
startup_timeout_seconds=5,
init_containers=[init_container],
get_init_containers_logs=True,
init_container_logs=["init-container"],
progress_callback=progress_callback,
)
context = create_context(k)
k.execute(context)

calls = progress_callback.call_args_list
assert any(marker_from_init_container in "".join(c.args) for c in calls)
assert any(marker_from_main_container in "".join(c.args) for c in calls)


def test_hide_sensitive_field_in_templated_fields_on_error(caplog, monkeypatch):
logger = logging.getLogger("airflow.task")
Expand Down
37 changes: 37 additions & 0 deletions providers/src/airflow/providers/cncf/kubernetes/operators/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,11 @@ class KubernetesPodOperator(BaseOperator):
:param labels: labels to apply to the Pod. (templated)
:param startup_timeout_seconds: timeout in seconds to startup the pod.
:param startup_check_interval_seconds: interval in seconds to check if the pod has already started
:param get_init_containers_logs: get the stdout of the init containers as logs of the tasks.
:param get_logs: get the stdout of the base container as logs of the tasks.
:param init_container_logs: list of init containers whose logs will be published to stdout
Takes a sequence of containers, a single container name or True. If True,
all the containers logs are published. Works in conjunction with get_init_containers_logs param.
:param container_logs: list of containers whose logs will be published to stdout
Takes a sequence of containers, a single container name or True. If True,
all the containers logs are published. Works in conjunction with get_logs param.
Expand Down Expand Up @@ -285,8 +289,10 @@ def __init__(
reattach_on_restart: bool = True,
startup_timeout_seconds: int = 120,
startup_check_interval_seconds: int = 5,
get_init_containers_logs: bool = False,
get_logs: bool = True,
base_container_name: str | None = None,
init_container_logs: Iterable[str] | str | Literal[True] | None = None,
container_logs: Iterable[str] | str | Literal[True] | None = None,
image_pull_policy: str | None = None,
annotations: dict | None = None,
Expand Down Expand Up @@ -358,9 +364,11 @@ def __init__(
self.cluster_context = cluster_context
self.reattach_on_restart = reattach_on_restart
self.get_logs = get_logs
self.get_init_containers_logs = get_init_containers_logs
# Fallback to the class variable BASE_CONTAINER_NAME here instead of via default argument value
# in the init method signature, to be compatible with subclasses overloading the class variable value.
self.base_container_name = base_container_name or self.BASE_CONTAINER_NAME
self.init_container_logs = init_container_logs
self.container_logs = container_logs or self.base_container_name
self.image_pull_policy = image_pull_policy
self.node_selector = node_selector or {}
Expand Down Expand Up @@ -620,6 +628,10 @@ def execute_sync(self, context: Context):
self.callbacks.on_pod_creation(
pod=self.remote_pod, client=self.client, mode=ExecutionMode.SYNC
)

if self.get_init_containers_logs:
self.await_init_containers_completion(pod=self.pod)

self.await_pod_start(pod=self.pod)
if self.callbacks:
self.callbacks.on_pod_starting(
Expand Down Expand Up @@ -655,6 +667,31 @@ def execute_sync(self, context: Context):
if self.do_xcom_push:
return result

@tenacity.retry(
wait=tenacity.wait_exponential(max=15),
retry=tenacity.retry_if_exception_type(PodCredentialsExpiredFailure),
reraise=True,
)
def await_init_containers_completion(self, pod: k8s.V1Pod):
try:
if self.get_init_containers_logs:
self.pod_manager.fetch_requested_init_container_logs(
pod=pod,
init_containers=self.init_container_logs,
follow_logs=True,
)
except kubernetes.client.exceptions.ApiException as exc:
if exc.status and str(exc.status) == "401":
self.log.warning(
"Failed to check container status due to permission error. Refreshing credentials and retrying."
)
self._refresh_cached_properties()
self.pod_manager.read_pod(
pod=pod
) # attempt using refreshed credentials, raises if still invalid
raise PodCredentialsExpiredFailure("Kubernetes credentials expired, retrying after refresh.")
raise exc

@tenacity.retry(
wait=tenacity.wait_exponential(max=15),
retry=tenacity.retry_if_exception_type(PodCredentialsExpiredFailure),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from __future__ import annotations

import enum
import itertools
import json
import math
import time
Expand Down Expand Up @@ -118,7 +119,13 @@ def get_xcom_sidecar_container_resources(self) -> str | None:

def get_container_status(pod: V1Pod, container_name: str) -> V1ContainerStatus | None:
"""Retrieve container status."""
container_statuses = pod.status.container_statuses if pod and pod.status else None
if pod and pod.status:
container_statuses = itertools.chain(
pod.status.container_statuses, pod.status.init_container_statuses
)
else:
container_statuses = None

if container_statuses:
# In general the variable container_statuses can store multiple items matching different containers.
# The following generator expression yields all items that have name equal to the container_name.
Expand Down Expand Up @@ -565,6 +572,28 @@ def _reconcile_requested_log_containers(
self.log.error("Could not retrieve containers for the pod: %s", pod_name)
return containers_to_log

def fetch_requested_init_container_logs(
self, pod: V1Pod, init_containers: Iterable[str] | str | Literal[True], follow_logs=False
) -> list[PodLoggingStatus]:
"""
Follow the logs of containers in the specified pod and publish it to airflow logging.
Returns when all the containers exit.
:meta private:
"""
pod_logging_statuses = []
all_containers = self.get_init_container_names(pod)
containers_to_log = self._reconcile_requested_log_containers(
requested=init_containers,
actual=all_containers,
pod_name=pod.metadata.name,
)
for c in containers_to_log:
status = self.fetch_container_logs(pod=pod, container_name=c, follow=follow_logs)
pod_logging_statuses.append(status)
return pod_logging_statuses

def fetch_requested_container_logs(
self, pod: V1Pod, containers: Iterable[str] | str | Literal[True], follow_logs=False
) -> list[PodLoggingStatus]:
Expand Down Expand Up @@ -692,6 +721,12 @@ def read_pod_logs(
post_termination_timeout=post_termination_timeout,
)

@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
def get_init_container_names(self, pod: V1Pod) -> list[str]:
"""Return container names from the POD except for the airflow-xcom-sidecar container."""
pod_info = self.read_pod(pod)
return [container_spec.name for container_spec in pod_info.spec.init_containers]

@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
def get_container_names(self, pod: V1Pod) -> list[str]:
"""Return container names from the POD except for the airflow-xcom-sidecar container."""
Expand Down

0 comments on commit d23bd7b

Please sign in to comment.