Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix logs with leading spaces in the Docker operator (#33692) #43840

Merged
merged 1 commit into from
Nov 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions providers/src/airflow/providers/docker/operators/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
from airflow.utils.types import NOTSET, ArgNotSet

if TYPE_CHECKING:
from logging import Logger

from docker import APIClient
from docker.types import DeviceRequest

Expand All @@ -62,6 +64,16 @@ def stringify(line: str | bytes):
return line


def fetch_logs(log_stream, log: Logger):
log_lines = []
for log_chunk in log_stream:
log_chunk = stringify(log_chunk).rstrip()
log_lines.append(log_chunk)
for log_chunk_line in log_chunk.split("\n"):
log.info("%s", log_chunk_line)
return log_lines


class DockerOperator(BaseOperator):
"""
Execute a command inside a docker container.
Expand Down Expand Up @@ -426,16 +438,11 @@ def _run_image_with_mounts(self, target_mounts, add_tmp_variable: bool) -> list[
tty=self.tty,
hostname=self.hostname,
)
logstream = self.cli.attach(container=self.container["Id"], stdout=True, stderr=True, stream=True)
log_stream = self.cli.attach(container=self.container["Id"], stdout=True, stderr=True, stream=True)
try:
self.cli.start(self.container["Id"])

log_lines = []
for log_chunk in logstream:
log_chunk = stringify(log_chunk).strip()
log_lines.append(log_chunk)
for log_chunk_line in log_chunk.split("\n"):
self.log.info("%s", log_chunk_line)
log_lines = fetch_logs(log_stream, self.log)

result = self.cli.wait(self.container["Id"])
if result["StatusCode"] in self.skip_on_exit_code:
Expand Down
32 changes: 31 additions & 1 deletion providers/tests/docker/operators/test_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning, AirflowSkipException
from airflow.providers.docker.exceptions import DockerContainerFailedException
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.providers.docker.operators.docker import DockerOperator, fetch_logs
from airflow.utils.task_instance_session import set_current_task_instance_session

TEST_CONN_ID = "docker_test_connection"
Expand Down Expand Up @@ -865,3 +865,33 @@ def test_partial_deprecated_skip_exit_code_ambiguous(
pytest.raises(ValueError, match="Conflicting `skip_on_exit_code` provided"),
):
ti.render_templates()

@pytest.mark.parametrize(
"log_lines, expected_lines",
[
pytest.param(
[
"return self.main(*args, **kwargs)",
" ^^^^^^^^^^^^^^^^",
],
[
"return self.main(*args, **kwargs)",
" ^^^^^^^^^^^^^^^^",
],
id="should-not-remove-leading-spaces",
),
pytest.param(
[
" ^^^^^^^^^^^^^^^^ ",
],
[
" ^^^^^^^^^^^^^^^^",
],
id="should-remove-trailing-spaces",
),
],
)
@mock.patch("logging.Logger")
def test_fetch_logs(self, logger_mock, log_lines, expected_lines):
fetch_logs(log_lines, logger_mock)
assert logger_mock.info.call_args_list == [call("%s", line) for line in expected_lines]