From 95da6b2392503a5a01ae94b34520cdf9fe0c4582 Mon Sep 17 00:00:00 2001 From: Pedro Brochado Date: Thu, 14 Nov 2024 17:20:45 -0300 Subject: [PATCH] Fix wrong package stream resulting in 200 Ok Assuming we want to keep our stream-redirect approach on the content-app, We cant recover from wrong data already sent if the Remote happens to be corrupted (contains wrong binaries). In order to not give a 200 reponse to client, we decided to close the connection as soon as the request handler realizes the checksum is wrong. That only happens after we already sent the whole blob minus EOF, so we close the connection before sending the EOF. Additionally, we put some message on the logs for admins to see and have a chance to manually fix the remote/remote_artifacts. Co-authored-by: Matthias Dellweg <2500@gmx.de> fixes #5012 --- CHANGES/5012.bugfix | 3 ++ pulp_file/pytest_plugin.py | 4 +- pulpcore/content/handler.py | 33 ++++++++++++-- .../api/using_plugin/test_content_delivery.py | 43 ++++++++++++++++++- 4 files changed, 77 insertions(+), 6 deletions(-) create mode 100644 CHANGES/5012.bugfix diff --git a/CHANGES/5012.bugfix b/CHANGES/5012.bugfix new file mode 100644 index 0000000000..199235e30e --- /dev/null +++ b/CHANGES/5012.bugfix @@ -0,0 +1,3 @@ +Fixed content-app behavior for the case where the client would get a 200 response for a package +streamed from a Remote which did not match the expected checksum. +Now, the connection is closed before finalizing the response. diff --git a/pulp_file/pytest_plugin.py b/pulp_file/pytest_plugin.py index a995b714c0..2fe4624469 100644 --- a/pulp_file/pytest_plugin.py +++ b/pulp_file/pytest_plugin.py @@ -83,8 +83,8 @@ def file_fixtures_root(tmp_path): @pytest.fixture def write_3_iso_file_fixture_data_factory(file_fixtures_root): - def _write_3_iso_file_fixture_data_factory(name): - file_fixtures_root.joinpath(name).mkdir() + def _write_3_iso_file_fixture_data_factory(name, overwrite=False): + file_fixtures_root.joinpath(name).mkdir(exist_ok=overwrite) file1 = generate_iso(file_fixtures_root.joinpath(f"{name}/1.iso")) file2 = generate_iso(file_fixtures_root.joinpath(f"{name}/2.iso")) file3 = generate_iso(file_fixtures_root.joinpath(f"{name}/3.iso")) diff --git a/pulpcore/content/handler.py b/pulpcore/content/handler.py index 60c5aa2c4a..339837348c 100644 --- a/pulpcore/content/handler.py +++ b/pulpcore/content/handler.py @@ -3,6 +3,8 @@ from multidict import CIMultiDict import os import re +import socket +import struct from gettext import gettext as _ from aiohttp.client_exceptions import ClientResponseError, ClientConnectionError @@ -54,7 +56,10 @@ cache_key, ) -from pulpcore.exceptions import UnsupportedDigestValidationError # noqa: E402 +from pulpcore.exceptions import ( # noqa: E402 + UnsupportedDigestValidationError, + DigestValidationError, +) from pulpcore.metrics import artifacts_size_counter # noqa: E402 from jinja2 import Template # noqa: E402: module level not at top of file @@ -1125,13 +1130,25 @@ async def finalize(): await original_finalize() downloader = remote.get_downloader( - remote_artifact=remote_artifact, headers_ready_callback=handle_response_headers + remote_artifact=remote_artifact, + headers_ready_callback=handle_response_headers, ) original_handle_data = downloader.handle_data downloader.handle_data = handle_data original_finalize = downloader.finalize downloader.finalize = finalize - download_result = await downloader.run() + try: + download_result = await downloader.run() + except DigestValidationError: + await downloader.session.close() + close_tcp_connection(request.transport._sock) + raise RuntimeError( + f"We tried streaming {remote_artifact.url!r} to the client, but it " + "failed checkusm validation. " + "At this point, we cant recover from wrong data already sent, " + "so we are forcing the connection to close. " + "If this error persists, the remote server might be corrupted." + ) if content_length := response.headers.get("Content-Length"): response.headers["X-PULP-ARTIFACT-SIZE"] = content_length @@ -1149,3 +1166,13 @@ async def finalize(): if response.status == 404: raise HTTPNotFound() return response + + +def close_tcp_connection(sock): + """Configure socket to close TCP connection immediately.""" + try: + l_onoff = 1 + l_linger = 0 # 0 seconds timeout - immediate close + sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack("ii", l_onoff, l_linger)) + except (socket.error, OSError) as e: + log.warning(f"Error configuring socket for force close: {e}") diff --git a/pulpcore/tests/functional/api/using_plugin/test_content_delivery.py b/pulpcore/tests/functional/api/using_plugin/test_content_delivery.py index 1054afc042..11735ca04d 100644 --- a/pulpcore/tests/functional/api/using_plugin/test_content_delivery.py +++ b/pulpcore/tests/functional/api/using_plugin/test_content_delivery.py @@ -1,8 +1,9 @@ """Tests related to content delivery.""" -from aiohttp.client_exceptions import ClientResponseError +from aiohttp.client_exceptions import ClientResponseError, ClientPayloadError import hashlib import pytest +import subprocess from urllib.parse import urljoin from pulpcore.client.pulp_file import ( @@ -102,3 +103,43 @@ def test_remote_artifact_url_update( actual_checksum = hashlib.sha256(downloaded_file.body).hexdigest() expected_checksum = expected_file_list[0][1] assert expected_checksum == actual_checksum + + +@pytest.mark.parallel +def test_remote_content_changed_with_on_demand( + write_3_iso_file_fixture_data_factory, + file_repo_with_auto_publish, + file_remote_ssl_factory, + file_bindings, + monitor_task, + file_distribution_factory, +): + """ + GIVEN a remote synced on demand with fileA (e.g, digest=123), + WHEN on the remote server, fileA changed its content (e.g, digest=456), + THEN retrieving fileA from the content app will cause a connection-close/incomplete-response. + """ + # GIVEN + basic_manifest_path = write_3_iso_file_fixture_data_factory("basic") + remote = file_remote_ssl_factory(manifest_path=basic_manifest_path, policy="on_demand") + body = RepositorySyncURL(remote=remote.pulp_href) + monitor_task( + file_bindings.RepositoriesFileApi.sync(file_repo_with_auto_publish.pulp_href, body).task + ) + repo = file_bindings.RepositoriesFileApi.read(file_repo_with_auto_publish.pulp_href) + distribution = file_distribution_factory(repository=repo.pulp_href) + expected_file_list = list(get_files_in_manifest(remote.url)) + + # WHEN + write_3_iso_file_fixture_data_factory("basic", overwrite=True) + + # THEN + get_url = urljoin(distribution.base_url, expected_file_list[0][0]) + with pytest.raises(ClientPayloadError, match="Response payload is not completed"): + download_file(get_url) + + # Assert again with curl just to be sure. + result = subprocess.run(["curl", "-v", get_url], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + assert result.returncode == 18 + assert b"* Closing connection 0" in result.stderr + assert b"curl: (18) transfer closed with outstanding read data remaining" in result.stderr