Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: make BuildError serialisable by Celery in case of retry #641

Merged
merged 6 commits into from
May 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

### Fixed

- Raise a serializable Exception so that CeleryRetryError won't crash ([#641](https://github.com/Substra/substra-backend/pull/641))
- Do not retry on non-timeout build errors ([#641](https://github.com/Substra/substra-backend/pull/641))

## [0.36.1](https://github.com/Substra/substra-backend/releases/tag/0.36.1) 2023-04-21

### Fixed
Expand Down
18 changes: 16 additions & 2 deletions backend/substrapp/compute_tasks/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class _ComputeTaskError(RuntimeError):
error_type: ComputeTaskErrorType


class BuildError(_ComputeTaskError, CeleryRetryError):
class BuildRetryError(_ComputeTaskError, CeleryRetryError):
"""An error occurred during the build of a container image.

Args:
Expand All @@ -72,7 +72,21 @@ class BuildError(_ComputeTaskError, CeleryRetryError):

def __init__(self, logs: str, *args, **kwargs):
self.logs = BytesIO(str.encode(logs))
super().__init__(*args, **kwargs)
super().__init__(logs, *args, **kwargs)


class BuildError(_ComputeTaskError, CeleryNoRetryError):
"""An error occurred during the build of a container image.

Args:
logs (str): the container image build logs
"""

error_type = ComputeTaskErrorType.BUILD_ERROR

def __init__(self, logs: str, *args, **kwargs):
self.logs = BytesIO(str.encode(logs))
super().__init__(logs, *args, **kwargs)


class ExecutionError(_ComputeTaskError, CeleryNoRetryError):
Expand Down
43 changes: 24 additions & 19 deletions backend/substrapp/compute_tasks/image_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from django.conf import settings

import orchestrator
from substrapp import exceptions
from substrapp.compute_tasks import errors as compute_task_errors
from substrapp.compute_tasks import utils
from substrapp.compute_tasks.compute_pod import Label
Expand Down Expand Up @@ -114,6 +115,15 @@ def _get_entrypoint_from_dockerfile(dockerfile_dir: str) -> list[str]:
raise compute_task_errors.BuildError("Invalid Dockerfile: Cannot find ENTRYPOINT")


def _delete_kaniko_pod(create_pod: bool, k8s_client: kubernetes.client.CoreV1Api, pod_name: str) -> str:
logs = ""
if create_pod:
logs = get_pod_logs(k8s_client, pod_name, KANIKO_CONTAINER_NAME, ignore_pod_not_found=True)
delete_pod(k8s_client, pod_name)
logger.info(logs or "", pod_name=pod_name)
return logs


@timeit
def _build_container_image(path: str, tag: str) -> None:
_assert_dockerfile_exist(path)
Expand All @@ -130,35 +140,30 @@ def _build_container_image(path: str, tag: str) -> None:
pod = _build_pod(path, tag)
k8s_client.create_namespaced_pod(body=pod, namespace=NAMESPACE)
except kubernetes.client.ApiException as e:
raise compute_task_errors.BuildError(
raise compute_task_errors.BuildRetryError(
f"Error creating pod {NAMESPACE}/{pod_name}. Reason: {e.reason}, status: {e.status}, body: {e.body}"
) from e

build_exc = None

try:
watch_pod(k8s_client, pod_name)

except Exception as e:
# In case of concurrent builds, it may fail. Check if the image exists.
if container_image_exists(tag):
logger.warning(
f"Build of container image {tag} failed, probably because it was done by a concurrent build",
exc_info=True,
)
return
build_exc = e

finally:
logs = None

if create_pod:
logs = get_pod_logs(k8s_client, pod_name, KANIKO_CONTAINER_NAME, ignore_pod_not_found=True)
delete_pod(k8s_client, pod_name)
for line in (logs or "").split("\n"):
logger.info(line, pod_name=pod_name)

if build_exc:
err_msg = str(build_exc)
if logs:
err_msg += "\n\n" + logs
raise compute_task_errors.BuildError(err_msg)

logs = _delete_kaniko_pod(create_pod, k8s_client, pod_name)

if isinstance(e, exceptions.PodTimeoutError):
raise compute_task_errors.BuildRetryError(logs) from e
else: # exceptions.PodError or other
raise compute_task_errors.BuildError(logs) from e

_delete_kaniko_pod(create_pod, k8s_client, pod_name)


def _assert_dockerfile_exist(dockerfile_path):
Expand Down
96 changes: 49 additions & 47 deletions backend/substrapp/tests/compute_tasks/test_image_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,53 +22,55 @@
"""


class TestBuildImageIfMissing:
def test_image_already_exists(self, mocker: MockerFixture, function: orchestrator.Function):
ds = mocker.Mock()
m_container_image_exists = mocker.patch(
"substrapp.compute_tasks.image_builder.container_image_exists", return_value=True
)
function_image_tag = utils.container_image_tag_from_function(function)

image_builder.build_image_if_missing(datastore=ds, function=function)

m_container_image_exists.assert_called_once_with(function_image_tag)

def test_image_build_needed(self, mocker: MockerFixture, function: orchestrator.Function):
ds = mocker.Mock()
m_container_image_exists = mocker.patch(
"substrapp.compute_tasks.image_builder.container_image_exists", return_value=False
)
m_build_function_image = mocker.patch("substrapp.compute_tasks.image_builder._build_function_image")
function_image_tag = utils.container_image_tag_from_function(function)

image_builder.build_image_if_missing(datastore=ds, function=function)

m_container_image_exists.assert_called_once_with(function_image_tag)
m_build_function_image.assert_called_once()
assert m_build_function_image.call_args.args[1] == function


class TestGetEntrypointFromDockerfile:
def test_valid_dockerfile(self, tmp_path: pathlib.Path):
dockerfile_path = tmp_path / "Dockerfile"
dockerfile_path.write_text(_VALID_DOCKERFILE)
entrypoint = image_builder._get_entrypoint_from_dockerfile(str(tmp_path))

assert entrypoint == ["python3", "myfunction.py"]

@pytest.mark.parametrize(
"dockerfile,expected_exc_content",
[
pytest.param(_NO_ENTRYPOINT, "Invalid Dockerfile: Cannot find ENTRYPOINT", id="no entrypoint"),
pytest.param(_ENTRYPOINT_SHELL_FORM, "Invalid ENTRYPOINT", id="shell form"),
],
def test_build_image_if_missing_image_already_exists(mocker: MockerFixture, function: orchestrator.Function):
ds = mocker.Mock()
m_container_image_exists = mocker.patch(
"substrapp.compute_tasks.image_builder.container_image_exists", return_value=True
)
def test_invalid_dockerfile(self, tmp_path: pathlib.Path, dockerfile: str, expected_exc_content: str):
dockerfile_path = tmp_path / "Dockerfile"
dockerfile_path.write_text(dockerfile)
function_image_tag = utils.container_image_tag_from_function(function)

with pytest.raises(compute_task_errors.BuildError) as exc:
image_builder._get_entrypoint_from_dockerfile(str(tmp_path))
image_builder.build_image_if_missing(datastore=ds, function=function)

assert expected_exc_content in bytes.decode(exc.value.logs.read())
m_container_image_exists.assert_called_once_with(function_image_tag)


def test_build_image_if_missing_image_build_needed(mocker: MockerFixture, function: orchestrator.Function):
ds = mocker.Mock()
m_container_image_exists = mocker.patch(
"substrapp.compute_tasks.image_builder.container_image_exists", return_value=False
)
m_build_function_image = mocker.patch("substrapp.compute_tasks.image_builder._build_function_image")
function_image_tag = utils.container_image_tag_from_function(function)

image_builder.build_image_if_missing(datastore=ds, function=function)

m_container_image_exists.assert_called_once_with(function_image_tag)
m_build_function_image.assert_called_once()
assert m_build_function_image.call_args.args[1] == function


def test_get_entrypoint_from_dockerfile_valid_dockerfile(tmp_path: pathlib.Path):
dockerfile_path = tmp_path / "Dockerfile"
dockerfile_path.write_text(_VALID_DOCKERFILE)
entrypoint = image_builder._get_entrypoint_from_dockerfile(str(tmp_path))

assert entrypoint == ["python3", "myfunction.py"]


@pytest.mark.parametrize(
"dockerfile,expected_exc_content",
[
pytest.param(_NO_ENTRYPOINT, "Invalid Dockerfile: Cannot find ENTRYPOINT", id="no entrypoint"),
pytest.param(_ENTRYPOINT_SHELL_FORM, "Invalid ENTRYPOINT", id="shell form"),
],
)
def test_get_entrypoint_from_dockerfile_invalid_dockerfile(
tmp_path: pathlib.Path, dockerfile: str, expected_exc_content: str
):
dockerfile_path = tmp_path / "Dockerfile"
dockerfile_path.write_text(dockerfile)

with pytest.raises(compute_task_errors.BuildError) as exc:
image_builder._get_entrypoint_from_dockerfile(str(tmp_path))

assert expected_exc_content in bytes.decode(exc.value.logs.read())
27 changes: 27 additions & 0 deletions backend/substrapp/tests/test_kubernetes_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,30 @@ def test_get_service_node_port():
service.spec.ports[0].node_port = 9000
port = substrapp.kubernetes_utils.get_service_node_port("my_service")
assert port == 9000


def test_get_pod_logs(mocker):
mocker.patch("kubernetes.client.CoreV1Api.read_namespaced_pod_log", return_value="Super great logs")
k8s_client = kubernetes.client.CoreV1Api()
logs = substrapp.kubernetes_utils.get_pod_logs(k8s_client, "pod_name", "container_name", ignore_pod_not_found=True)
assert logs == "Super great logs"


def test_get_pod_logs_not_found():
with mock.patch("kubernetes.client.CoreV1Api.read_namespaced_pod_log") as read_pod:
read_pod.side_effect = kubernetes.client.ApiException(404, "Not Found")
k8s_client = kubernetes.client.CoreV1Api()
logs = substrapp.kubernetes_utils.get_pod_logs(
k8s_client, "pod_name", "container_name", ignore_pod_not_found=True
)
assert "Pod not found" in logs


def test_get_pod_logs_bad_request():
with mock.patch("kubernetes.client.CoreV1Api.read_namespaced_pod_log") as read_pod:
read_pod.side_effect = kubernetes.client.ApiException(400, "Bad Request")
k8s_client = kubernetes.client.CoreV1Api()
logs = substrapp.kubernetes_utils.get_pod_logs(
k8s_client, "pod_name", "container_name", ignore_pod_not_found=True
)
assert "pod_name" in logs