From dff2a85c6145f3f7240cf6f8e05ae3b3f76d878f Mon Sep 17 00:00:00 2001 From: Nils Braun Date: Tue, 5 Nov 2024 10:16:50 +0100 Subject: [PATCH 1/2] Fix a bug in the _upload_file_part_concurrent method for certain file sizes --- s3fs/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/s3fs/core.py b/s3fs/core.py index f3dc411d..3b9ed995 100644 --- a/s3fs/core.py +++ b/s3fs/core.py @@ -1285,7 +1285,7 @@ async def _upload_chunk(chunk, part_number): ) ) else: - out.append(await _upload_chunk(chunk, len(out) + 1)) + out.append(await _upload_chunk(chunks[0], len(out) + 1)) return out async def _get_file( From bedae4a2960878b2b33a0d40c44ea9da00620079 Mon Sep 17 00:00:00 2001 From: Nils Braun Date: Wed, 6 Nov 2024 16:41:31 +0100 Subject: [PATCH 2/2] Added test against file truncation and only use a single code --- s3fs/core.py | 17 +++++++---------- s3fs/tests/test_s3fs.py | 18 ++++++++++++++++++ 2 files changed, 25 insertions(+), 10 deletions(-) diff --git a/s3fs/core.py b/s3fs/core.py index 3b9ed995..9b9a3f57 100644 --- a/s3fs/core.py +++ b/s3fs/core.py @@ -1275,17 +1275,14 @@ async def _upload_chunk(chunk, part_number): chunks.append(chunk) if not chunks: break - if len(chunks) > 1: - out.extend( - await asyncio.gather( - *[ - _upload_chunk(chunk, len(out) + i) - for i, chunk in enumerate(chunks, 1) - ] - ) + out.extend( + await asyncio.gather( + *[ + _upload_chunk(chunk, len(out) + i) + for i, chunk in enumerate(chunks, 1) + ] ) - else: - out.append(await _upload_chunk(chunks[0], len(out) + 1)) + ) return out async def _get_file( diff --git a/s3fs/tests/test_s3fs.py b/s3fs/tests/test_s3fs.py index 3379da52..b64907d1 100644 --- a/s3fs/tests/test_s3fs.py +++ b/s3fs/tests/test_s3fs.py @@ -1017,6 +1017,24 @@ def test_put_file_with_callback(s3, tmpdir, size): assert cb.size == os.stat(test_file).st_size assert cb.value == cb.size + assert s3.size(test_bucket_name + "/temp") == 11 * size + + +@pytest.mark.parametrize("factor", [1, 5, 6]) +def test_put_file_does_not_truncate(s3, tmpdir, factor): + test_file = str(tmpdir.join("test.json")) + + chunksize = 5 * 2**20 + block = b"x" * chunksize + + with open(test_file, "wb") as f: + f.write(block * factor) + + s3.put_file( + test_file, test_bucket_name + "/temp", max_concurrency=5, chunksize=chunksize + ) + assert s3.size(test_bucket_name + "/temp") == factor * chunksize + @pytest.mark.parametrize("size", [2**10, 2**20, 10 * 2**20]) def test_pipe_cat_big(s3, size):