Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix BlobReader handling of interleaved reads and seeks #721

Merged
merged 3 commits into from
Mar 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 22 additions & 16 deletions google/cloud/storage/fileio.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,12 @@ def read(self, size=-1):
# If the read request demands more bytes than are buffered, fetch more.
remaining_size = size - len(result)
if remaining_size > 0 or size < 0:
self._pos += self._buffer.tell()
read_size = len(result)

self._buffer.seek(0)
self._buffer.truncate(0) # Clear the buffer to make way for new data.
fetch_start = self._pos + len(result)
fetch_start = self._pos
if size > 0:
# Fetch the larger of self._chunk_size or the remaining_size.
fetch_end = fetch_start + max(remaining_size, self._chunk_size)
Expand Down Expand Up @@ -154,9 +157,8 @@ def read(self, size=-1):
self._buffer.write(result[size:])
self._buffer.seek(0)
result = result[:size]

self._pos += len(result)

# Increment relative offset by true amount read.
self._pos += len(result) - read_size
return result

def read1(self, size=-1):
Expand All @@ -174,29 +176,33 @@ def seek(self, pos, whence=0):
if self._blob.size is None:
self._blob.reload(**self._download_kwargs)

initial_pos = self._pos
initial_offset = self._pos + self._buffer.tell()

if whence == 0:
self._pos = pos
target_pos = pos
elif whence == 1:
self._pos += pos
target_pos = initial_offset + pos
elif whence == 2:
self._pos = self._blob.size + pos
target_pos = self._blob.size + pos
if whence not in {0, 1, 2}:
raise ValueError("invalid whence value")

if self._pos > self._blob.size:
self._pos = self._blob.size
if target_pos > self._blob.size:
target_pos = self._blob.size

# Seek or invalidate buffer as needed.
difference = self._pos - initial_pos
new_buffer_pos = self._buffer.seek(difference, 1)
if new_buffer_pos != difference: # Buffer does not contain new pos.
# Invalidate buffer.
if target_pos < self._pos:
# Target position < relative offset <= true offset.
# As data is not in buffer, invalidate buffer.
self._buffer.seek(0)
self._buffer.truncate(0)

return self._pos
new_pos = target_pos
self._pos = target_pos
else:
# relative offset <= target position <= size of file.
difference = target_pos - initial_offset
new_pos = self._pos + self._buffer.seek(difference, 1)
return new_pos

def close(self):
self._buffer.close()
Expand Down
35 changes: 35 additions & 0 deletions tests/unit/test_fileio.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,41 @@ def initialize_size(**_):

reader.close()

def test_advanced_seek(self):
blob = mock.Mock()

def read_from_fake_data(start=0, end=None, **_):
return TEST_BINARY_DATA[start:end] * 1024

blob.download_as_bytes = mock.Mock(side_effect=read_from_fake_data)
blob.size = None
download_kwargs = {"if_metageneration_match": 1}
reader = self._make_blob_reader(blob, chunk_size=1024, **download_kwargs)

# Seek needs the blob size to work and should call reload() if the size
# is not known. Set a mock to initialize the size if reload() is called.
def initialize_size(**_):
blob.size = len(TEST_BINARY_DATA) * 1024

blob.reload = mock.Mock(side_effect=initialize_size)

self.assertEqual(reader.tell(), 0)
# Mimic tarfile access pattern. Read tarinfo block.
reader.read(512)
self.assertEqual(reader.tell(), 512)
self.assertEqual(reader.seek(512), 512)
# Mimic read actual tar content.
reader.read(400)
self.assertEqual(reader.tell(), 912)
# Tarfile offsets are rounded up by block size
# A sanity seek/read is used to check for unexpected ends.
reader.seek(1023)
reader.read(1)
self.assertEqual(reader.tell(), 1024)
reader.read(512)
self.assertEqual(reader.tell(), 1536)
reader.close()

def test_close(self):
blob = mock.Mock()
reader = self._make_blob_reader(blob)
Expand Down