Skip to content

Commit

Permalink
Add logging support for init containers in KubernetesPodOperator (#42498
Browse files Browse the repository at this point in the history
)

This change adds an option to print logs for init containers. The init_container_logs argument enables the display of logs specifically for spec.initContainers (not spec.containers).

Fixes: #42498
  • Loading branch information
mrk-andreev committed Nov 10, 2024
1 parent 63b2bbd commit bbb224d
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 2 deletions.
97 changes: 97 additions & 0 deletions kubernetes_tests/test_kubernetes_pod_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1326,6 +1326,103 @@ class MyK8SPodOperator(KubernetesPodOperator):
)
assert MyK8SPodOperator(task_id=str(uuid4())).base_container_name == "tomato-sauce"

def test_init_container_logs(self, mock_get_connection):
marker_from_init_container = f"{uuid4()}"
marker_from_main_container = f"{uuid4()}"
progress_callback = MagicMock()
init_container = k8s.V1Container(
name="init-container",
image="busybox",
command=["sh", "-cx"],
args=[f"echo {marker_from_init_container}"],
)
k = KubernetesPodOperator(
namespace="default",
image="busybox",
cmds=["sh", "-cx"],
arguments=[f"echo {marker_from_main_container}"],
labels=self.labels,
task_id=str(uuid4()),
in_cluster=False,
do_xcom_push=False,
startup_timeout_seconds=5,
init_containers=[init_container],
init_container_logs=True,
progress_callback=progress_callback,
)
context = create_context(k)
k.execute(context)

calls = progress_callback.call_args_list
assert any(marker_from_init_container in "".join(c.args) for c in calls)
assert any(marker_from_main_container in "".join(c.args) for c in calls)

def test_init_container_logs_filtered(self, mock_get_connection):
marker_from_init_container_to_log_1 = f"{uuid4()}"
marker_from_init_container_to_log_2 = f"{uuid4()}"
marker_from_init_container_to_ignore = f"{uuid4()}"
marker_from_main_container = f"{uuid4()}"
progress_callback = MagicMock()
init_container_to_log_1 = k8s.V1Container(
name="init-container-to-log-1",
image="busybox",
command=["sh", "-cx"],
args=[f"echo {marker_from_init_container_to_log_1}"],
)
init_container_to_log_2 = k8s.V1Container(
name="init-container-to-log-2",
image="busybox",
command=["sh", "-cx"],
args=[f"echo {marker_from_init_container_to_log_2}"],
)
init_container_to_ignore = k8s.V1Container(
name="init-container-to-ignore",
image="busybox",
command=["sh", "-cx"],
args=[f"echo {marker_from_init_container_to_ignore}"],
)
k = KubernetesPodOperator(
namespace="default",
image="busybox",
cmds=["sh", "-cx"],
arguments=[f"echo {marker_from_main_container}"],
labels=self.labels,
task_id=str(uuid4()),
in_cluster=False,
do_xcom_push=False,
startup_timeout_seconds=5,
init_containers=[
init_container_to_log_1,
init_container_to_log_2,
init_container_to_ignore,
],
init_container_logs=[
# not same order as defined in init_containers
"init-container-to-log-2",
"init-container-to-log-1",
],
progress_callback=progress_callback,
)
context = create_context(k)
k.execute(context)

calls = progress_callback.call_args_list
assert any(marker_from_init_container_to_log_1 in "".join(c.args) for c in calls)
assert any(marker_from_init_container_to_log_2 in "".join(c.args) for c in calls)
assert not any(marker_from_init_container_to_ignore in "".join(c.args) for c in calls)
assert any(marker_from_main_container in "".join(c.args) for c in calls)

# check that we consume logs in order as it was defined in pod spec
def find_index(sub_str: str, arr: list[str]):
return next(i for i, s in enumerate(arr) if sub_str in s)

calls_args = ["".join(c.args) for c in calls]
assert (
find_index(marker_from_init_container_to_log_1, calls_args)
< find_index(marker_from_init_container_to_log_2, calls_args)
< find_index(marker_from_main_container, calls_args)
)


def test_hide_sensitive_field_in_templated_fields_on_error(caplog, monkeypatch):
logger = logging.getLogger("airflow.task")
Expand Down
33 changes: 33 additions & 0 deletions providers/src/airflow/providers/cncf/kubernetes/operators/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ class KubernetesPodOperator(BaseOperator):
:param startup_timeout_seconds: timeout in seconds to startup the pod.
:param startup_check_interval_seconds: interval in seconds to check if the pod has already started
:param get_logs: get the stdout of the base container as logs of the tasks.
:param init_container_logs: list of init containers whose logs will be published to stdout
Takes a sequence of containers, a single container name or True. If True,
all the containers logs are published.
:param container_logs: list of containers whose logs will be published to stdout
Takes a sequence of containers, a single container name or True. If True,
all the containers logs are published. Works in conjunction with get_logs param.
Expand Down Expand Up @@ -287,6 +290,7 @@ def __init__(
startup_check_interval_seconds: int = 5,
get_logs: bool = True,
base_container_name: str | None = None,
init_container_logs: Iterable[str] | str | Literal[True] | None = None,
container_logs: Iterable[str] | str | Literal[True] | None = None,
image_pull_policy: str | None = None,
annotations: dict | None = None,
Expand Down Expand Up @@ -361,6 +365,7 @@ def __init__(
# Fallback to the class variable BASE_CONTAINER_NAME here instead of via default argument value
# in the init method signature, to be compatible with subclasses overloading the class variable value.
self.base_container_name = base_container_name or self.BASE_CONTAINER_NAME
self.init_container_logs = init_container_logs
self.container_logs = container_logs or self.base_container_name
self.image_pull_policy = image_pull_policy
self.node_selector = node_selector or {}
Expand Down Expand Up @@ -620,6 +625,9 @@ def execute_sync(self, context: Context):
self.callbacks.on_pod_creation(
pod=self.remote_pod, client=self.client, mode=ExecutionMode.SYNC
)

self.await_init_containers_completion(pod=self.pod)

self.await_pod_start(pod=self.pod)
if self.callbacks:
self.callbacks.on_pod_starting(
Expand Down Expand Up @@ -655,6 +663,31 @@ def execute_sync(self, context: Context):
if self.do_xcom_push:
return result

@tenacity.retry(
wait=tenacity.wait_exponential(max=15),
retry=tenacity.retry_if_exception_type(PodCredentialsExpiredFailure),
reraise=True,
)
def await_init_containers_completion(self, pod: k8s.V1Pod):
try:
if self.init_container_logs:
self.pod_manager.fetch_requested_init_container_logs(
pod=pod,
init_containers=self.init_container_logs,
follow_logs=True,
)
except kubernetes.client.exceptions.ApiException as exc:
if exc.status and str(exc.status) == "401":
self.log.warning(
"Failed to check container status due to permission error. Refreshing credentials and retrying."
)
self._refresh_cached_properties()
self.pod_manager.read_pod(
pod=pod
) # attempt using refreshed credentials, raises if still invalid
raise PodCredentialsExpiredFailure("Kubernetes credentials expired, retrying after refresh.")
raise exc

@tenacity.retry(
wait=tenacity.wait_exponential(max=15),
retry=tenacity.retry_if_exception_type(PodCredentialsExpiredFailure),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from __future__ import annotations

import enum
import itertools
import json
import math
import time
Expand Down Expand Up @@ -118,7 +119,13 @@ def get_xcom_sidecar_container_resources(self) -> str | None:

def get_container_status(pod: V1Pod, container_name: str) -> V1ContainerStatus | None:
"""Retrieve container status."""
container_statuses = pod.status.container_statuses if pod and pod.status else None
if pod and pod.status:
container_statuses = itertools.chain(
pod.status.container_statuses, pod.status.init_container_statuses
)
else:
container_statuses = None

if container_statuses:
# In general the variable container_statuses can store multiple items matching different containers.
# The following generator expression yields all items that have name equal to the container_name.
Expand Down Expand Up @@ -522,7 +529,7 @@ def consume_logs(*, since_time: DateTime | None = None) -> tuple[DateTime | None
time.sleep(1)

def _reconcile_requested_log_containers(
self, requested: Iterable[str] | str | bool, actual: list[str], pod_name
self, requested: Iterable[str] | str | bool | None, actual: list[str], pod_name
) -> list[str]:
"""Return actual containers based on requested."""
containers_to_log = []
Expand Down Expand Up @@ -565,6 +572,30 @@ def _reconcile_requested_log_containers(
self.log.error("Could not retrieve containers for the pod: %s", pod_name)
return containers_to_log

def fetch_requested_init_container_logs(
self, pod: V1Pod, init_containers: Iterable[str] | str | Literal[True] | None, follow_logs=False
) -> list[PodLoggingStatus]:
"""
Follow the logs of containers in the specified pod and publish it to airflow logging.
Returns when all the containers exit.
:meta private:
"""
pod_logging_statuses = []
all_containers = self.get_init_container_names(pod)
containers_to_log = self._reconcile_requested_log_containers(
requested=init_containers,
actual=all_containers,
pod_name=pod.metadata.name,
)
# sort by spec.initContainers because containers runs sequentially
containers_to_log = sorted(containers_to_log, key=lambda cn: all_containers.index(cn))
for c in containers_to_log:
status = self.fetch_container_logs(pod=pod, container_name=c, follow=follow_logs)
pod_logging_statuses.append(status)
return pod_logging_statuses

def fetch_requested_container_logs(
self, pod: V1Pod, containers: Iterable[str] | str | Literal[True], follow_logs=False
) -> list[PodLoggingStatus]:
Expand Down Expand Up @@ -692,6 +723,12 @@ def read_pod_logs(
post_termination_timeout=post_termination_timeout,
)

@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
def get_init_container_names(self, pod: V1Pod) -> list[str]:
"""Return container names from the POD except for the airflow-xcom-sidecar container."""
pod_info = self.read_pod(pod)
return [container_spec.name for container_spec in pod_info.spec.init_containers]

@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
def get_container_names(self, pod: V1Pod) -> list[str]:
"""Return container names from the POD except for the airflow-xcom-sidecar container."""
Expand Down
4 changes: 4 additions & 0 deletions providers/tests/cncf/kubernetes/utils/test_pod_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,7 @@ def remote_pod(running=None, not_running=None):
e = RemotePodMock()
e.status = RemotePodMock()
e.status.container_statuses = []
e.status.init_container_statuses = []
for r in not_running or []:
e.status.container_statuses.append(container(r, False))
for r in running or []:
Expand All @@ -659,6 +660,7 @@ def container(name, running):
p = RemotePodMock()
p.status = RemotePodMock()
p.status.container_statuses = []
p.status.init_container_statuses = []
pod_mock_list.append(pytest.param(p, False, id="empty remote_pod.status.container_statuses"))
pod_mock_list.append(pytest.param(remote_pod(), False, id="filter empty"))
pod_mock_list.append(pytest.param(remote_pod(None, ["base"]), False, id="filter 0 running"))
Expand Down Expand Up @@ -874,6 +876,7 @@ def remote_pod(succeeded=None, not_succeeded=None):
e = RemotePodMock()
e.status = RemotePodMock()
e.status.container_statuses = []
e.status.init_container_statuses = []
for r in not_succeeded or []:
e.status.container_statuses.append(container(r, False))
for r in succeeded or []:
Expand All @@ -894,6 +897,7 @@ def container(name, succeeded):
p = RemotePodMock()
p.status = RemotePodMock()
p.status.container_statuses = []
p.status.init_container_statuses = []
pod_mock_list.append(pytest.param(p, False, id="empty remote_pod.status.container_statuses"))
pod_mock_list.append(pytest.param(remote_pod(), False, id="filter empty"))
pod_mock_list.append(pytest.param(remote_pod(None, ["base"]), False, id="filter 0 succeeded"))
Expand Down

0 comments on commit bbb224d

Please sign in to comment.