diff --git a/asdf/_block/io.py b/asdf/_block/io.py index 3a42bed77..5ccd4d724 100644 --- a/asdf/_block/io.py +++ b/asdf/_block/io.py @@ -105,7 +105,7 @@ def read_block_header(fd, offset=None): return validate_block_header(header) -def read_block_data(fd, header, offset=None, memmap=False): +def read_block_data(fd, header, offset=None, memmap=False, out=None): """ Read (or memory map) data for an ASDF block. @@ -127,6 +127,9 @@ def read_block_data(fd, header, offset=None, memmap=False): does not support memmapping the data will not be memmapped (and no error will be raised). + out : ndarray, optional + Destination array for read block data. + Returns ------- data : ndarray or memmap @@ -148,14 +151,14 @@ def read_block_data(fd, header, offset=None, memmap=False): compression = mcompression.validate(header["compression"]) if compression: # compressed data will not be memmapped - data = mcompression.decompress(fd, used_size, header["data_size"], compression) + data = mcompression.decompress(fd, used_size, header["data_size"], compression, out=out) fd.fast_forward(header["allocated_size"] - header["used_size"]) else: if memmap and fd.can_memmap(): data = fd.memmap_array(offset, used_size) ff_bytes = header["allocated_size"] else: - data = fd.read_into_array(used_size) + data = fd.read_into_array(used_size, out=out) ff_bytes = header["allocated_size"] - header["used_size"] if (header["flags"] & constants.BLOCK_FLAG_STREAMED) and fd.seekable(): fd.seek(0, os.SEEK_END) @@ -164,7 +167,7 @@ def read_block_data(fd, header, offset=None, memmap=False): return data -def read_block(fd, offset=None, memmap=False, lazy_load=False): +def read_block(fd, offset=None, memmap=False, lazy_load=False, out=None): """ Read a block (header and data) from an ASDF file. @@ -187,6 +190,9 @@ def read_block(fd, offset=None, memmap=False, lazy_load=False): Return a callable that when called will read the block data. This option is ignored for a non-seekable file. + out : ndarray, optional + Destination array for read block data. + Returns ------- offset : int @@ -216,13 +222,15 @@ def read_block(fd, offset=None, memmap=False, lazy_load=False): # setup a callback to later load the data fd_ref = weakref.ref(fd) - def callback(): + def callback(out=None): + # out here can be different since this callback might be called + # at a later point fd = fd_ref() if fd is None or fd.is_closed(): msg = "ASDF file has already been closed. Can not get the data." raise OSError(msg) position = fd.tell() - data = read_block_data(fd, header, offset=data_offset, memmap=memmap) + data = read_block_data(fd, header, offset=data_offset, memmap=memmap, out=out) fd.seek(position) return data @@ -232,7 +240,7 @@ def callback(): else: fd.fast_forward(header["allocated_size"]) else: - data = read_block_data(fd, header, offset=None, memmap=memmap) + data = read_block_data(fd, header, offset=None, memmap=memmap, out=out) return offset, header, data_offset, data diff --git a/asdf/_block/reader.py b/asdf/_block/reader.py index 60abd9d34..c1a97ae0f 100644 --- a/asdf/_block/reader.py +++ b/asdf/_block/reader.py @@ -33,24 +33,37 @@ def close(self): def loaded(self): return self._data is not None - def load(self): + def load(self, out=None): """ Load the block data (if it is not already loaded). + Parameters + ---------- + out : ndarray, optional + Destination array for read block data. + Raises ------ OSError If attempting to load from a closed file. """ - if self.loaded: + if self.loaded and out is None: return + if out is not None: + # FIXME document this + lazy_load = False + memmap = False + else: + lazy_load = self.lazy_load + memmap = self.memmap fd = self._fd() if fd is None or fd.is_closed(): msg = "Attempt to load block from closed file" raise OSError(msg) position = fd.tell() + # FIXME this re-reads the header every time _, self._header, self.data_offset, self._data = bio.read_block( - fd, offset=self.offset, memmap=self.memmap, lazy_load=self.lazy_load + fd, offset=self.offset, memmap=memmap, lazy_load=lazy_load, out=out ) fd.seek(position) diff --git a/asdf/_compression.py b/asdf/_compression.py index 89076e313..91b3b1893 100644 --- a/asdf/_compression.py +++ b/asdf/_compression.py @@ -235,7 +235,7 @@ def to_compression_header(compression): return compression -def decompress(fd, used_size, data_size, compression, config=None): +def decompress(fd, used_size, data_size, compression, config=None, out=None): """ Decompress binary data in a file @@ -257,12 +257,16 @@ def decompress(fd, used_size, data_size, compression, config=None): Any kwarg parameters to pass to the underlying decompression function + out : numpy.array, optional + Array in which to store decompressed data + Returns ------- array : numpy.array A flat uint8 containing the decompressed data. """ - buffer = np.empty((data_size,), np.uint8) + if out is None: + out = np.empty((data_size,), np.uint8) compression = validate(compression) decoder = _get_compressor(compression) @@ -270,13 +274,13 @@ def decompress(fd, used_size, data_size, compression, config=None): config = {} blocks = fd.read_blocks(used_size) # data is a generator - len_decoded = decoder.decompress(blocks, out=buffer.data, **config) + len_decoded = decoder.decompress(blocks, out=out.data, **config) if len_decoded != data_size: msg = "Decompressed data wrong size" raise ValueError(msg) - return buffer + return out def compress(fd, data, compression, config=None): diff --git a/asdf/generic_io.py b/asdf/generic_io.py index 4114e104f..00545d0f9 100644 --- a/asdf/generic_io.py +++ b/asdf/generic_io.py @@ -669,7 +669,7 @@ def flush_memmap(self): msg = f"memmapping is not implemented for {self.__class__.__name__}" raise NotImplementedError(msg) - def read_into_array(self, size): + def read_into_array(self, size, out=None): """ Read a chunk of the file into a uint8 array. @@ -683,7 +683,10 @@ def read_into_array(self, size): array : np.memmap """ buff = self.read(size) - return np.frombuffer(buff, np.uint8, size, 0) + if out is None: + return np.frombuffer(buff, np.uint8, size, 0) + out[:] = np.frombuffer(buff, np.uint8, size, 0) + return out class GenericWrapper: @@ -794,7 +797,13 @@ def flush_memmap(self): if hasattr(self, "_mmap"): self._mmap.flush() - def read_into_array(self, size): + def read_into_array(self, size, out=None): + if out is not None: + read_size = self._fd.readinto(out.data) + if read_size != size: + # TODO better message here + raise OSError("Read inconsistency") + return out return np.fromfile(self._fd, dtype=np.uint8, count=size) def _fix_permissions(self): @@ -853,13 +862,17 @@ class MemoryIO(RandomAccessFile): def __init__(self, fd, mode, uri=None): super().__init__(fd, mode, uri=uri) - def read_into_array(self, size): + def read_into_array(self, size, out=None): buf = self._fd.getvalue() offset = self._fd.tell() + # TODO improve this result = np.frombuffer(buf, np.uint8, size, offset) - # Copy the buffer so the original memory can be released. - result = result.copy() self.seek(size, SEEK_CUR) + if out is not None: + out[:] = result + else: + # Copy the buffer so the original memory can be released. + result = result.copy() return result @@ -937,7 +950,8 @@ def fast_forward(self, size): msg = "Read past end of file" raise OSError(msg) - def read_into_array(self, size): + def read_into_array(self, size, out=None): + # TODO support out... what becomes an InputStream? try: # See if Numpy can handle this as a real file first... return np.fromfile(self._fd, np.uint8, size)