Skip to content

Commit

Permalink
Merge pull request #17380 from [BEAM-14314][BEAM-9532] Add last_updat…
Browse files Browse the repository at this point in the history
…ed field in filesystem.FileMetaData

* [BEAM-14314] Add last_updated field in filesystem.FileMetaData

* Add last_updated_in_seconds field in FileMetaData and used in match

* Add metadata method for FileSystem implementations

* Fix precommit

* Fix style and docstring

* increase test coverage for metadata methods

* Address comments and Last updated Metadata improvements

* Fix naming

* Implement last_updated in hadoop filesystem

* [BEAM-9532] fix issue with s3io_test last updated

* Fix pylint and add back removed coverage

* Fix haddopfilesystem unit test

* Address comments and fix flake hadoopfilesystem unit test

* Add comments to time assertion choice in test

* Fix method _status renaming leftover

* Also fix method _status renaming leftover in tests
  • Loading branch information
Abacn authored May 2, 2022
1 parent 095190d commit 0daef62
Show file tree
Hide file tree
Showing 18 changed files with 522 additions and 184 deletions.
6 changes: 4 additions & 2 deletions sdks/python/apache_beam/io/aws/clients/s3/fake_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import datetime
import time

import pytz

from apache_beam.io.aws.clients.s3 import messages


Expand All @@ -39,8 +41,8 @@ def __init__(self, bucket, key, contents, etag=None):
def get_metadata(self):
last_modified_datetime = None
if self.last_modified:
last_modified_datetime = datetime.datetime.utcfromtimestamp(
self.last_modified)
last_modified_datetime = datetime.datetime.fromtimestamp(
self.last_modified, pytz.utc)

return messages.Item(
self.etag,
Expand Down
25 changes: 22 additions & 3 deletions sdks/python/apache_beam/io/aws/s3filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ def _list(self, dir_or_prefix):
``BeamIOError``: if listing fails, but not if no files were found.
"""
try:
for path, size in \
s3io.S3IO(options=self._options).list_prefix(dir_or_prefix).items():
yield FileMetadata(path, size)
for path, (size, updated) in s3io.S3IO(options=self._options) \
.list_prefix(dir_or_prefix, with_metadata=True).items():
yield FileMetadata(path, size, updated)
except Exception as e: # pylint: disable=broad-except
raise BeamIOError("List operation failed", {dir_or_prefix: e})

Expand Down Expand Up @@ -281,6 +281,25 @@ def checksum(self, path):
except Exception as e: # pylint: disable=broad-except
raise BeamIOError("Checksum operation failed", {path: e})

def metadata(self, path):
"""Fetch metadata fields of a file on the FileSystem.
Args:
path: string path of a file.
Returns:
:class:`~apache_beam.io.filesystem.FileMetadata`.
Raises:
``BeamIOError``: if path isn't a file or doesn't exist.
"""
try:
file_metadata = s3io.S3IO(options=self._options)._status(path)
return FileMetadata(
path, file_metadata['size'], file_metadata['last_updated'])
except Exception as e: # pylint: disable=broad-except
raise BeamIOError("Metadata operation failed", {path: e})

def delete(self, paths):
"""Deletes files or directories at the provided paths.
Directories will be deleted recursively.
Expand Down
40 changes: 28 additions & 12 deletions sdks/python/apache_beam/io/aws/s3filesystem_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,35 +78,50 @@ def test_split(self):
with self.assertRaises(ValueError):
self.fs.split('/no/s3/prefix')

@mock.patch('apache_beam.io.aws.s3filesystem.s3io')
def test_match_single(self, unused_mock_arg):
# Prepare mocks.
s3io_mock = mock.MagicMock()
s3filesystem.s3io.S3IO = lambda options: s3io_mock # type: ignore[misc]
s3io_mock._status.return_value = {'size': 1, 'last_updated': 9999999.0}
expected_results = [FileMetadata('s3://bucket/file1', 1, 9999999.0)]
match_result = self.fs.match(['s3://bucket/file1'])[0]

self.assertEqual(match_result.metadata_list, expected_results)
s3io_mock._status.assert_called_once_with('s3://bucket/file1')

@mock.patch('apache_beam.io.aws.s3filesystem.s3io')
def test_match_multiples(self, unused_mock_arg):
# Prepare mocks.
s3io_mock = mock.MagicMock()
s3filesystem.s3io.S3IO = lambda options: s3io_mock # type: ignore[misc]
s3io_mock.list_prefix.return_value = {
's3://bucket/file1': 1, 's3://bucket/file2': 2
's3://bucket/file1': (1, 9999999.0),
's3://bucket/file2': (2, 8888888.0)
}
expected_results = set([
FileMetadata('s3://bucket/file1', 1),
FileMetadata('s3://bucket/file2', 2)
FileMetadata('s3://bucket/file1', 1, 9999999.0),
FileMetadata('s3://bucket/file2', 2, 8888888.0)
])
match_result = self.fs.match(['s3://bucket/'])[0]

self.assertEqual(set(match_result.metadata_list), expected_results)
s3io_mock.list_prefix.assert_called_once_with('s3://bucket/')
s3io_mock.list_prefix.assert_called_once_with(
's3://bucket/', with_metadata=True)

@mock.patch('apache_beam.io.aws.s3filesystem.s3io')
def test_match_multiples_limit(self, unused_mock_arg):
# Prepare mocks.
s3io_mock = mock.MagicMock()
limit = 1
s3filesystem.s3io.S3IO = lambda options: s3io_mock # type: ignore[misc]
s3io_mock.list_prefix.return_value = {'s3://bucket/file1': 1}
expected_results = set([FileMetadata('s3://bucket/file1', 1)])
s3io_mock.list_prefix.return_value = {'s3://bucket/file1': (1, 99999.0)}
expected_results = set([FileMetadata('s3://bucket/file1', 1, 99999.0)])
match_result = self.fs.match(['s3://bucket/'], [limit])[0]
self.assertEqual(set(match_result.metadata_list), expected_results)
self.assertEqual(len(match_result.metadata_list), limit)
s3io_mock.list_prefix.assert_called_once_with('s3://bucket/')
s3io_mock.list_prefix.assert_called_once_with(
's3://bucket/', with_metadata=True)

@mock.patch('apache_beam.io.aws.s3filesystem.s3io')
def test_match_multiples_error(self, unused_mock_arg):
Expand All @@ -120,7 +135,8 @@ def test_match_multiples_error(self, unused_mock_arg):
self.fs.match(['s3://bucket/'])

self.assertIn('Match operation failed', str(error.exception))
s3io_mock.list_prefix.assert_called_once_with('s3://bucket/')
s3io_mock.list_prefix.assert_called_once_with(
's3://bucket/', with_metadata=True)

@mock.patch('apache_beam.io.aws.s3filesystem.s3io')
def test_match_multiple_patterns(self, unused_mock_arg):
Expand All @@ -129,14 +145,14 @@ def test_match_multiple_patterns(self, unused_mock_arg):
s3filesystem.s3io.S3IO = lambda options: s3io_mock # type: ignore[misc]
s3io_mock.list_prefix.side_effect = [
{
's3://bucket/file1': 1
's3://bucket/file1': (1, 99999.0)
},
{
's3://bucket/file2': 2
's3://bucket/file2': (2, 88888.0)
},
]
expected_results = [[FileMetadata('s3://bucket/file1', 1)],
[FileMetadata('s3://bucket/file2', 2)]]
expected_results = [[FileMetadata('s3://bucket/file1', 1, 99999.0)],
[FileMetadata('s3://bucket/file2', 2, 88888.0)]]
result = self.fs.match(['s3://bucket/file1*', 's3://bucket/file2*'])
self.assertEqual([mr.metadata_list for mr in result], expected_results)

Expand Down
98 changes: 68 additions & 30 deletions sdks/python/apache_beam/io/aws/s3io.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,23 +101,28 @@ def open(

@retry.with_exponential_backoff(
retry_filter=retry.retry_on_server_errors_and_timeout_filter)
def list_prefix(self, path):
def list_prefix(self, path, with_metadata=False):
"""Lists files matching the prefix.
Args:
path: S3 file path pattern in the form s3://<bucket>/[name].
with_metadata: Experimental. Specify whether returns file metadata.
Returns:
Dictionary of file name -> size.
If ``with_metadata`` is False: dict of file name -> size; if
``with_metadata`` is True: dict of file name -> tuple(size, timestamp).
"""
bucket, prefix = parse_s3_path(path, object_optional=True)
request = messages.ListRequest(bucket=bucket, prefix=prefix)

file_sizes = {}
file_info = {}
counter = 0
start_time = time.time()

logging.info("Starting the size estimation of the input")
if with_metadata:
logging.info("Starting the file information of the input")
else:
logging.info("Starting the size estimation of the input")

while True:
#The list operation will raise an exception
Expand All @@ -134,10 +139,19 @@ def list_prefix(self, path):

for item in response.items:
file_name = 's3://%s/%s' % (bucket, item.key)
file_sizes[file_name] = item.size
if with_metadata:
file_info[file_name] = (
item.size, self._updated_to_seconds(item.last_modified))
else:
file_info[file_name] = item.size
counter += 1
if counter % 10000 == 0:
logging.info("Finished computing size of: %s files", len(file_sizes))
if with_metadata:
logging.info(
"Finished computing file information of: %s files",
len(file_info))
else:
logging.info("Finished computing size of: %s files", len(file_info))
if response.next_token:
request.continuation_token = response.next_token
else:
Expand All @@ -148,20 +162,15 @@ def list_prefix(self, path):
counter,
time.time() - start_time)

return file_sizes
return file_info

@retry.with_exponential_backoff(
retry_filter=retry.retry_on_server_errors_and_timeout_filter)
def checksum(self, path):
"""Looks up the checksum of an S3 object.
Args:
path: S3 file path pattern in the form s3://<bucket>/<name>.
"""
bucket, object_path = parse_s3_path(path)
request = messages.GetRequest(bucket, object_path)
item = self.client.get_object_metadata(request)
return item.etag
return self._s3_object(path).etag

@retry.with_exponential_backoff(
retry_filter=retry.retry_on_server_errors_and_timeout_filter)
Expand Down Expand Up @@ -400,8 +409,6 @@ def delete_tree(self, root):
paths = self.list_prefix(root)
return self.delete_files(paths)

@retry.with_exponential_backoff(
retry_filter=retry.retry_on_server_errors_and_timeout_filter)
def size(self, path):
"""Returns the size of a single S3 object.
Expand All @@ -410,10 +417,7 @@ def size(self, path):
Returns: size of the S3 object in bytes.
"""
bucket, object_path = parse_s3_path(path)
request = messages.GetRequest(bucket, object_path)
item = self.client.get_object_metadata(request)
return item.size
return self._s3_object(path).size

# We intentionally do not decorate this method with a retry, since the
# underlying copy and delete operations are already idempotent operations
Expand All @@ -428,8 +432,6 @@ def rename(self, src, dest):
self.copy(src, dest)
self.delete(src)

@retry.with_exponential_backoff(
retry_filter=retry.retry_on_server_errors_and_timeout_filter)
def last_updated(self, path):
"""Returns the last updated epoch time of a single S3 object.
Expand All @@ -438,23 +440,16 @@ def last_updated(self, path):
Returns: last updated time of the S3 object in second.
"""
bucket, object = parse_s3_path(path)
request = messages.GetRequest(bucket, object)
datetime = self.client.get_object_metadata(request).last_modified
return (
time.mktime(datetime.timetuple()) - time.timezone +
datetime.microsecond / 1000000.0)
return self._updated_to_seconds(self._s3_object(path).last_modified)

def exists(self, path):
"""Returns whether the given S3 object exists.
Args:
path: S3 file path pattern in the form s3://<bucket>/<name>.
"""
bucket, object = parse_s3_path(path)
request = messages.GetRequest(bucket, object)
try:
self.client.get_object_metadata(request)
self._s3_object(path)
return True
except messages.S3ClientError as e:
if e.code == 404:
Expand All @@ -464,6 +459,49 @@ def exists(self, path):
# We re-raise all other exceptions
raise

def _status(self, path):
"""For internal use only; no backwards-compatibility guarantees.
Returns supported fields (checksum, last_updated, size) of a single object
as a dict at once.
This method does not perform glob expansion. Hence the given path must be
for a single S3 object.
Returns: dict of fields of the S3 object.
"""
s3_object = self._s3_object(path)
file_status = {}
if hasattr(s3_object, 'etag'):
file_status['checksum'] = s3_object.etag
if hasattr(s3_object, 'last_modified'):
file_status['last_updated'] = self._updated_to_seconds(
s3_object.last_modified)
if hasattr(s3_object, 'size'):
file_status['size'] = s3_object.size
return file_status

@retry.with_exponential_backoff(
retry_filter=retry.retry_on_server_errors_and_timeout_filter)
def _s3_object(self, path):
"""Returns a S3 object metadata for the given path
This method does not perform glob expansion. Hence the given path must be
for a single S3 object.
Returns: S3 object metadata.
"""
bucket, object = parse_s3_path(path)
request = messages.GetRequest(bucket, object)
return self.client.get_object_metadata(request)

@staticmethod
def _updated_to_seconds(updated):
"""Helper function transform the updated field of response to seconds"""
return (
time.mktime(updated.timetuple()) - time.timezone +
updated.microsecond / 1000000.0)

def rename_files(self, src_dest_pairs):
"""Renames the given S3 objects from src to dest.
Expand Down
21 changes: 18 additions & 3 deletions sdks/python/apache_beam/io/aws/s3io_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,13 @@ def test_size(self):
self.aws.delete(file_name)

def test_last_updated(self):
self.skipTest('BEAM-9532 fix issue with s3 last updated')
file_name = self.TEST_DATA_PATH + 'dummy_file'
file_size = 1234

self._insert_random_file(self.client, file_name, file_size)
self.assertTrue(self.aws.exists(file_name))

# The time difference should be tiny for the mock client.
# A loose tolerance is for the consideration of real s3 client.
tolerance = 5 * 60 # 5 mins
result = self.aws.last_updated(file_name)
self.assertAlmostEqual(result, time.time(), delta=tolerance)
Expand All @@ -128,7 +128,6 @@ def test_last_updated(self):
self.aws.delete(file_name)

def test_checksum(self):

file_name = self.TEST_DATA_PATH + 'checksum'
file_size = 1024
file_ = self._insert_random_file(self.client, file_name, file_size)
Expand All @@ -149,6 +148,22 @@ def test_checksum(self):
# Clean up
self.aws.delete(file_name)

def test_file_status(self):
file_name = self.TEST_DATA_PATH + 'metadata'
file_size = 1024
self._insert_random_file(self.client, file_name, file_size)
file_checksum = self.aws.checksum(file_name)
file_timestamp = self.aws.last_updated(file_name)

file_status = self.aws._status(file_name)

self.assertEqual(file_status['size'], file_size)
self.assertEqual(file_status['checksum'], file_checksum)
self.assertEqual(file_status['last_updated'], file_timestamp)

# Clean up
self.aws.delete(file_name)

def test_copy(self):
src_file_name = self.TEST_DATA_PATH + 'source'
dest_file_name = self.TEST_DATA_PATH + 'dest'
Expand Down
Loading

0 comments on commit 0daef62

Please sign in to comment.