Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎨Dask sidecar: use reproducible zipfile library #6571

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions services/dask-sidecar/requirements/_base.in
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ fsspec[http, s3] # sub types needed as we acces http and s3 here
lz4 # for compression
pydantic[email,dotenv]
prometheus_client
repro-zipfile
4 changes: 3 additions & 1 deletion services/dask-sidecar/requirements/_base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,9 @@ referencing==0.29.3
# jsonschema
# jsonschema-specifications
repro-zipfile==0.3.1
# via -r requirements/../../../packages/service-library/requirements/_base.in
# via
# -r requirements/../../../packages/service-library/requirements/_base.in
# -r requirements/_base.in
requests==2.32.3
# via opentelemetry-exporter-otlp-proto-http
rich==13.7.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
import time
import zipfile
from collections.abc import Awaitable, Callable
from io import BytesIO
from io import IOBase
from pathlib import Path
from typing import Any, Final, TypedDict, cast

import aiofiles
import aiofiles.tempfile
import fsspec # type: ignore[import-untyped]
import repro_zipfile # type: ignore[import-untyped]
from pydantic import ByteSize, FileUrl, parse_obj_as
from pydantic.networks import AnyUrl
from servicelib.logging_utils import LogLevelInt, LogMessageStr
Expand All @@ -33,7 +34,7 @@ def _file_progress_cb(
log_publishing_cb: LogPublishingCB,
text_prefix: str,
main_loop: asyncio.AbstractEventLoop,
**kwargs,
**kwargs, # noqa: ARG001
):
asyncio.run_coroutine_threadsafe(
log_publishing_cb(
Expand Down Expand Up @@ -78,7 +79,7 @@ def _s3fs_settings_from_s3_settings(s3_settings: S3Settings) -> S3FsSettingsDict
return s3fs_settings


def _file_chunk_streamer(src: BytesIO, dst: BytesIO):
def _file_chunk_streamer(src: IOBase, dst: IOBase):
data = src.read(CHUNK_SIZE)
segment_len = dst.write(data)
return (data, segment_len)
Expand All @@ -98,6 +99,8 @@ async def _copy_file(
with fsspec.open(src_url, mode="rb", **src_storage_kwargs) as src_fp, fsspec.open(
dst_url, "wb", **dst_storage_kwargs
) as dst_fp:
assert isinstance(src_fp, IOBase) # nosec
assert isinstance(dst_fp, IOBase) # nosec
file_size = getattr(src_fp, "size", None)
data_read = True
total_data_written = 0
Expand Down Expand Up @@ -159,7 +162,7 @@ async def pull_file_from_remote(
if src_mime_type == _ZIP_MIME_TYPE and target_mime_type != _ZIP_MIME_TYPE:
await log_publishing_cb(f"Uncompressing '{dst_path.name}'...", logging.INFO)
logger.debug("%s is a zip file and will be now uncompressed", dst_path)
with zipfile.ZipFile(dst_path, "r") as zip_obj:
with repro_zipfile.ReproducibleZipFile(dst_path, "r") as zip_obj:
await asyncio.get_event_loop().run_in_executor(
None, zip_obj.extractall, dst_path.parents[0]
)
Expand Down Expand Up @@ -248,7 +251,8 @@ async def push_file_to_remote(
f"Compressing '{src_path.name}' to '{archive_file_path.name}'...",
logging.INFO,
)
with zipfile.ZipFile(

with repro_zipfile.ReproducibleZipFile(
archive_file_path, mode="w", compression=zipfile.ZIP_STORED
) as zfp:
await asyncio.get_event_loop().run_in_executor(
Expand Down
72 changes: 72 additions & 0 deletions services/dask-sidecar/tests/unit/test_file_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# pylint: disable=unused-variable

import asyncio
import hashlib
import mimetypes
import zipfile
from collections.abc import AsyncIterable
Expand Down Expand Up @@ -375,3 +376,74 @@ async def test_pull_compressed_zip_file_from_remote(
assert file.exists()
assert file.name in file_names_within_zip_file
mocked_log_publishing_cb.assert_called()


def _compute_hash(file_path: Path) -> str:
with file_path.open("rb") as file_to_hash:
file_hash = hashlib.sha256()
chunk = file_to_hash.read(8192)
sanderegg marked this conversation as resolved.
Show resolved Hide resolved
while chunk:
file_hash.update(chunk)
chunk = file_to_hash.read(8192)

return file_hash.hexdigest()


async def test_push_file_to_remote_creates_reproducible_zip_archive(
remote_parameters: StorageParameters,
tmp_path: Path,
faker: Faker,
mocked_log_publishing_cb: mock.AsyncMock,
):
destination_url1 = parse_obj_as(AnyUrl, f"{remote_parameters.remote_file_url}1.zip")
destination_url2 = parse_obj_as(AnyUrl, f"{remote_parameters.remote_file_url}2.zip")
src_path = tmp_path / faker.file_name()
TEXT_IN_FILE = faker.text()
src_path.write_text(TEXT_IN_FILE)
assert src_path.exists()

# pushing 2 times should produce the same archive with the same hash
await push_file_to_remote(
src_path,
destination_url1,
mocked_log_publishing_cb,
remote_parameters.s3_settings,
)
await asyncio.sleep(
5
) # NOTE: we wait a bit to ensure the created zipfile has a different creation time (that is normally used for computing the hash)
await push_file_to_remote(
src_path,
destination_url2,
mocked_log_publishing_cb,
remote_parameters.s3_settings,
)

# now we pull both file and compare their hash

# USE-CASE 1: if destination is a zip then no decompression is done
download_folder = tmp_path / "download"
download_folder.mkdir(parents=True, exist_ok=True)
assert download_folder.exists()
dst_path1 = download_folder / f"{faker.file_name()}1.zip"
dst_path2 = download_folder / f"{faker.file_name()}2.zip"

await pull_file_from_remote(
src_url=destination_url1,
target_mime_type=None,
dst_path=dst_path1,
log_publishing_cb=mocked_log_publishing_cb,
s3_settings=remote_parameters.s3_settings,
)
assert dst_path1.exists()

await pull_file_from_remote(
src_url=destination_url2,
target_mime_type=None,
dst_path=dst_path2,
log_publishing_cb=mocked_log_publishing_cb,
s3_settings=remote_parameters.s3_settings,
)
assert dst_path2.exists()

assert _compute_hash(dst_path1) == _compute_hash(dst_path2)
Loading