Skip to content

Commit

Permalink
Merge pull request #16735 from [BEAM-13827] - fix medium file size up…
Browse files Browse the repository at this point in the history
…load to s3

* fix medium file size upload

* fix tests

* take care of zerolength files as before

* zerolength files should raise an error?

* handle fake and real s3 clients

* simplified tests

* Revert "simplified tests"

Apparently, this did not work as I expected

This reverts commit f49b2b4.

* formatting

* linting

* refactor test

* comments

* fix: [BEAM-13856]
  • Loading branch information
hoshimura authored Feb 15, 2022
1 parent 69a8567 commit b3de798
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 2 deletions.
9 changes: 7 additions & 2 deletions sdks/python/apache_beam/io/aws/s3io.py
Original file line number Diff line number Diff line change
Expand Up @@ -604,8 +604,13 @@ def _write_to_s3(self, data):
raise

def finish(self):

self._write_to_s3(self.buffer)
if len(self.buffer) > 0:
# writes with zero length or mid size files between
# MIN_WRITE_SIZE = 5 * 1024 * 1024
# MAX_WRITE_SIZE = 5 * 1024 * 1024 * 1024
# as we will reach this finish() with len(self.buffer) == 0
# which will fail
self._write_to_s3(self.buffer)

if self.last_error is not None:
raise self.last_error # pylint: disable=raising-bad-type
Expand Down
16 changes: 16 additions & 0 deletions sdks/python/apache_beam/io/aws/s3io_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,22 @@ def test_list_prefix(self):
for (object_name, size) in objects:
self.aws.delete(self.TEST_DATA_PATH + object_name)

def test_midsize_file(self):
file_name = self.TEST_DATA_PATH + 'midsized'
file_size = 6 * 1024 * 1024
self._insert_random_file(self.aws.client, file_name, file_size)
with self.aws.open(file_name, 'r') as f:
self.assertEqual(len(f.read()), file_size)
self.aws.delete(file_name)

def test_zerosize_file(self):
file_name = self.TEST_DATA_PATH + 'zerosized'
file_size = 0
self._insert_random_file(self.aws.client, file_name, file_size)
with self.aws.open(file_name, 'r') as f:
self.assertEqual(len(f.read()), file_size)
self.aws.delete(file_name)


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
Expand Down

0 comments on commit b3de798

Please sign in to comment.