Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: decompress into array #1848

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions asdf/_block/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def read_block_header(fd, offset=None):
return validate_block_header(header)


def read_block_data(fd, header, offset=None, memmap=False):
def read_block_data(fd, header, offset=None, memmap=False, out=None):
"""
Read (or memory map) data for an ASDF block.

Expand All @@ -127,6 +127,9 @@ def read_block_data(fd, header, offset=None, memmap=False):
does not support memmapping the data will not be memmapped (and
no error will be raised).

out : ndarray, optional
Destination array for read block data.

Returns
-------
data : ndarray or memmap
Expand All @@ -148,14 +151,14 @@ def read_block_data(fd, header, offset=None, memmap=False):
compression = mcompression.validate(header["compression"])
if compression:
# compressed data will not be memmapped
data = mcompression.decompress(fd, used_size, header["data_size"], compression)
data = mcompression.decompress(fd, used_size, header["data_size"], compression, out=out)
fd.fast_forward(header["allocated_size"] - header["used_size"])
else:
if memmap and fd.can_memmap():
data = fd.memmap_array(offset, used_size)
ff_bytes = header["allocated_size"]
else:
data = fd.read_into_array(used_size)
data = fd.read_into_array(used_size, out=out)
ff_bytes = header["allocated_size"] - header["used_size"]
if (header["flags"] & constants.BLOCK_FLAG_STREAMED) and fd.seekable():
fd.seek(0, os.SEEK_END)
Expand All @@ -164,7 +167,7 @@ def read_block_data(fd, header, offset=None, memmap=False):
return data


def read_block(fd, offset=None, memmap=False, lazy_load=False):
def read_block(fd, offset=None, memmap=False, lazy_load=False, out=None):
"""
Read a block (header and data) from an ASDF file.

Expand All @@ -187,6 +190,9 @@ def read_block(fd, offset=None, memmap=False, lazy_load=False):
Return a callable that when called will read the block data. This
option is ignored for a non-seekable file.

out : ndarray, optional
Destination array for read block data.

Returns
-------
offset : int
Expand Down Expand Up @@ -216,13 +222,15 @@ def read_block(fd, offset=None, memmap=False, lazy_load=False):
# setup a callback to later load the data
fd_ref = weakref.ref(fd)

def callback():
def callback(out=None):
# out here can be different since this callback might be called
# at a later point
fd = fd_ref()
if fd is None or fd.is_closed():
msg = "ASDF file has already been closed. Can not get the data."
raise OSError(msg)
position = fd.tell()
data = read_block_data(fd, header, offset=data_offset, memmap=memmap)
data = read_block_data(fd, header, offset=data_offset, memmap=memmap, out=out)
fd.seek(position)
return data

Expand All @@ -232,7 +240,7 @@ def callback():
else:
fd.fast_forward(header["allocated_size"])
else:
data = read_block_data(fd, header, offset=None, memmap=memmap)
data = read_block_data(fd, header, offset=None, memmap=memmap, out=out)
return offset, header, data_offset, data


Expand Down
19 changes: 16 additions & 3 deletions asdf/_block/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,37 @@
def loaded(self):
return self._data is not None

def load(self):
def load(self, out=None):
"""
Load the block data (if it is not already loaded).

Parameters
----------
out : ndarray, optional
Destination array for read block data.

Raises
------
OSError
If attempting to load from a closed file.
"""
if self.loaded:
if self.loaded and out is None:
return
if out is not None:
# FIXME document this
lazy_load = False
memmap = False

Check warning on line 55 in asdf/_block/reader.py

View check run for this annotation

Codecov / codecov/patch

asdf/_block/reader.py#L54-L55

Added lines #L54 - L55 were not covered by tests
else:
lazy_load = self.lazy_load
memmap = self.memmap
fd = self._fd()
if fd is None or fd.is_closed():
msg = "Attempt to load block from closed file"
raise OSError(msg)
position = fd.tell()
# FIXME this re-reads the header every time
_, self._header, self.data_offset, self._data = bio.read_block(
fd, offset=self.offset, memmap=self.memmap, lazy_load=self.lazy_load
fd, offset=self.offset, memmap=memmap, lazy_load=lazy_load, out=out
)
fd.seek(position)

Expand Down
12 changes: 8 additions & 4 deletions asdf/_compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ def to_compression_header(compression):
return compression


def decompress(fd, used_size, data_size, compression, config=None):
def decompress(fd, used_size, data_size, compression, config=None, out=None):
"""
Decompress binary data in a file

Expand All @@ -257,26 +257,30 @@ def decompress(fd, used_size, data_size, compression, config=None):
Any kwarg parameters to pass to the underlying decompression
function

out : numpy.array, optional
Array in which to store decompressed data

Returns
-------
array : numpy.array
A flat uint8 containing the decompressed data.
"""
buffer = np.empty((data_size,), np.uint8)
if out is None:
out = np.empty((data_size,), np.uint8)

compression = validate(compression)
decoder = _get_compressor(compression)
if config is None:
config = {}

blocks = fd.read_blocks(used_size) # data is a generator
len_decoded = decoder.decompress(blocks, out=buffer.data, **config)
len_decoded = decoder.decompress(blocks, out=out.data, **config)

if len_decoded != data_size:
msg = "Decompressed data wrong size"
raise ValueError(msg)

return buffer
return out


def compress(fd, data, compression, config=None):
Expand Down
28 changes: 21 additions & 7 deletions asdf/generic_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@
msg = f"memmapping is not implemented for {self.__class__.__name__}"
raise NotImplementedError(msg)

def read_into_array(self, size):
def read_into_array(self, size, out=None):
"""
Read a chunk of the file into a uint8 array.

Expand All @@ -683,7 +683,10 @@
array : np.memmap
"""
buff = self.read(size)
return np.frombuffer(buff, np.uint8, size, 0)
if out is None:
return np.frombuffer(buff, np.uint8, size, 0)
out[:] = np.frombuffer(buff, np.uint8, size, 0)
return out

Check warning on line 689 in asdf/generic_io.py

View check run for this annotation

Codecov / codecov/patch

asdf/generic_io.py#L688-L689

Added lines #L688 - L689 were not covered by tests


class GenericWrapper:
Expand Down Expand Up @@ -794,7 +797,13 @@
if hasattr(self, "_mmap"):
self._mmap.flush()

def read_into_array(self, size):
def read_into_array(self, size, out=None):
if out is not None:
read_size = self._fd.readinto(out.data)
if read_size != size:

Check warning on line 803 in asdf/generic_io.py

View check run for this annotation

Codecov / codecov/patch

asdf/generic_io.py#L802-L803

Added lines #L802 - L803 were not covered by tests
# TODO better message here
raise OSError("Read inconsistency")
return out

Check warning on line 806 in asdf/generic_io.py

View check run for this annotation

Codecov / codecov/patch

asdf/generic_io.py#L805-L806

Added lines #L805 - L806 were not covered by tests
return np.fromfile(self._fd, dtype=np.uint8, count=size)

def _fix_permissions(self):
Expand Down Expand Up @@ -853,13 +862,17 @@
def __init__(self, fd, mode, uri=None):
super().__init__(fd, mode, uri=uri)

def read_into_array(self, size):
def read_into_array(self, size, out=None):
buf = self._fd.getvalue()
offset = self._fd.tell()
# TODO improve this
result = np.frombuffer(buf, np.uint8, size, offset)
# Copy the buffer so the original memory can be released.
result = result.copy()
self.seek(size, SEEK_CUR)
if out is not None:
out[:] = result

Check warning on line 872 in asdf/generic_io.py

View check run for this annotation

Codecov / codecov/patch

asdf/generic_io.py#L872

Added line #L872 was not covered by tests
else:
# Copy the buffer so the original memory can be released.
result = result.copy()
return result


Expand Down Expand Up @@ -937,7 +950,8 @@
msg = "Read past end of file"
raise OSError(msg)

def read_into_array(self, size):
def read_into_array(self, size, out=None):
# TODO support out... what becomes an InputStream?
try:
# See if Numpy can handle this as a real file first...
return np.fromfile(self._fd, np.uint8, size)
Expand Down
Loading