Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LRU cache for decoded chunks #306

Closed
wants to merge 56 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
d62febb
first attempt at chunk_cache layer
shikharsg Aug 29, 2018
f796ea7
ChunkCache test with MockChunkCacheArray
shikharsg Aug 29, 2018
32141a9
np.copy not needed when accessing a subset of a chunk
shikharsg Oct 9, 2018
b35139b
fixed 'Mm' dtype error for buffersize function
shikharsg Oct 13, 2018
3c45176
renamed ChunkCache to LRUChunkCache
Oct 13, 2018
46dcf94
LRUChunkCache in zarr root namespace
Oct 13, 2018
c69c751
LRUChunkCache example
Oct 13, 2018
2cb143e
write caching of chunk should be done after encoding
Oct 15, 2018
2fb169e
ensure cached chunk has been round tripped through encode-decode if d…
Oct 15, 2018
31e4dfb
flake8 fixes
Oct 15, 2018
5559c4f
read write cache for 0-d arrays
Oct 15, 2018
2a0124a
added tutorial and api docs
Oct 15, 2018
6fac2ad
separated store tests from mutable mapping tests in test_storage.py
shikharsg Oct 20, 2018
4e79d0b
fixed pickle, __delitem__ and ordered dict iteration bugs
shikharsg Oct 20, 2018
5fd6fc8
documenting slowdown when using write cache with object arrays
shikharsg Oct 20, 2018
422f9eb
factoring out mapping code from LRUStoreCache and LRUChunkCache
shikharsg Oct 23, 2018
44cea83
consistent variable naming in _chunk_getitem
shikharsg Nov 11, 2018
1b67e90
removed unnecesary code from _set_basic_selection_zd and added encode…
shikharsg Nov 11, 2018
9b0cc29
flake 8 fixes
shikharsg Nov 11, 2018
715f86d
Merge remote-tracking branch 'upstream/master' into chunk_cache
shikharsg Nov 13, 2018
0013f95
fixed coverage
shikharsg Nov 14, 2018
b70c348
Merge branch 'chunk_cache' into master
shikharsg Nov 14, 2018
c4f2487
Merge pull request #4 from shikharsg/master
shikharsg Nov 14, 2018
245f661
Merge branch 'master' into chunk_cache
shikharsg Nov 15, 2018
a2a05fb
Merge branch 'master' into chunk_cache
shikharsg Jan 8, 2019
b8b9056
Merge branch 'chunk_cache' into chunk_cache_mapping_refactor
shikharsg Jan 9, 2019
04f0367
Merge pull request #3 from shikharsg/chunk_cache_mapping_refactor
shikharsg Jan 9, 2019
f19d43e
bug fix
shikharsg Jan 27, 2019
52a43bf
Merge branch 'master' into chunk_cache
shikharsg Jan 27, 2019
697d46e
python 2 and 3 compatibility
shikharsg Jan 27, 2019
1e727c7
Merge branch 'master' into chunk_cache
shikharsg Feb 10, 2019
377ece7
coverage fix and __init__.py LRUChunkCache import
shikharsg Feb 10, 2019
3473adb
merged chunk_cache with master
shikharsg Mar 4, 2019
df84c89
flake8 fix
shikharsg Mar 4, 2019
88fe66d
Merge branch 'master' into chunk_cache
Mar 30, 2019
a816014
Implemented https://github.com/zarr-developers/zarr/pull/306/files#r2…
Apr 11, 2019
8cc083b
cache tests refactor
May 3, 2019
23fcdea
fixed minor tests mistak
May 3, 2019
309cc48
Merge branch 'master' into chunk_cache
May 3, 2019
635ec87
flake8 fix
May 3, 2019
a85d156
Merge remote-tracking branch 'upstream/master' into chunk_cache
Aug 20, 2019
ef86184
merged with master
Oct 30, 2019
875c24f
added chunk cache to Group
Nov 20, 2019
dcd4ee7
merge with master
Nov 20, 2019
4a1baa9
added chunk_cache to all relevant function
Nov 20, 2019
e6540e1
Merge branch 'master' into chunk_cache
Dec 12, 2019
9f9d176
merge with master
Sep 30, 2020
6571382
fixed failing doctest
Sep 30, 2020
8c8a69f
Merge remote-tracking branch 'origin/master' into pr-306
joshmoore Feb 19, 2021
e0e5254
fixed setitem caching order
Feb 20, 2021
992b48a
Merge branch 'master' into chunk_cache
jakirkham Feb 21, 2021
38ee622
refactor
Feb 21, 2021
8b6a699
Merge branch 'master' into chunk_cache
Apr 5, 2021
ba5c0ed
Merge 'origin/master' into pr-306
joshmoore Aug 27, 2021
7cdce5f
Remove use of unittest
joshmoore Aug 27, 2021
06c899b
Merge branch 'master' into chunk_cache
joshmoore Sep 23, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 32 additions & 4 deletions zarr/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ class Array(object):
"""

def __init__(self, store, path=None, read_only=False, chunk_store=None,
synchronizer=None, cache_metadata=True, cache_attrs=True):
synchronizer=None, cache_metadata=True, cache_attrs=True,
chunk_cache=None):
# N.B., expect at this point store is fully initialized with all
# configuration metadata fully specified and normalized

Expand All @@ -118,6 +119,7 @@ def __init__(self, store, path=None, read_only=False, chunk_store=None,
self._synchronizer = synchronizer
self._cache_metadata = cache_metadata
self._is_view = False
self._chunk_cache = chunk_cache

# initialize metadata
self._load_metadata()
Expand Down Expand Up @@ -1562,8 +1564,21 @@ def _chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection,
ckey = self._chunk_key(chunk_coords)

try:

cdata = None
chunk_was_cached = False

# first try getting from cache (if one has been provided)
if self._chunk_cache is not None:
try:
cdata = self._chunk_cache[ckey]
chunk_was_cached = True
except KeyError:
pass

# obtain compressed data for chunk
cdata = self.chunk_store[ckey]
if not chunk_was_cached:
cdata = self.chunk_store[ckey]

except KeyError:
# chunk not initialized
Expand Down Expand Up @@ -1593,19 +1608,30 @@ def _chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection,
# contiguous, so we can decompress directly from the chunk
# into the destination array

if self._compressor:
if chunk_was_cached:
np.copyto(dest, cdata)
elif self._compressor:
self._compressor.decode(cdata, dest)
if self._chunk_cache is not None:
self._chunk_cache[ckey] = np.copy(dest)
else:
if isinstance(cdata, np.ndarray):
chunk = cdata.view(self._dtype)
else:
chunk = np.frombuffer(cdata, dtype=self._dtype)
chunk = chunk.reshape(self._chunks, order=self._order)
np.copyto(dest, chunk)
if self._chunk_cache is not None:
self._chunk_cache[ckey] = np.copy(chunk)
return

# decode chunk
chunk = self._decode_chunk(cdata)
if not chunk_was_cached:
chunk = self._decode_chunk(cdata)
if self._chunk_cache is not None:
self._chunk_cache[ckey] = np.copy(chunk)
else:
chunk = cdata

# select data from chunk
if fields:
Expand Down Expand Up @@ -1714,6 +1740,8 @@ def _chunk_setitem_nosync(self, chunk_coords, chunk_selection, value, fields=Non
else:
chunk[chunk_selection] = value

if self._chunk_cache is not None:
self._chunk_cache[ckey] = np.copy(chunk)
# encode chunk
cdata = self._encode_chunk(chunk)

Expand Down
118 changes: 118 additions & 0 deletions zarr/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -1883,3 +1883,121 @@ def __delitem__(self, key):
with self._mutex:
self._invalidate_keys()
self._invalidate_value(key)


class ChunkCache(MutableMapping):
"""Storage class that implements a least-recently-used (LRU) cache for array chunks.
Intended primarily for use with stores that can be slow to access, e.g., remote stores that
require network communication to store and retrieve data.

Parameters
----------
max_size : int
The maximum size that the cache may grow to, in number of bytes. Provide `None`
if you would like the cache to have unlimited size.

"""

def __init__(self, max_size):
self._max_size = max_size
self._current_size = 0
self._values_cache = OrderedDict()
self._mutex = Lock()
self.hits = self.misses = 0

def __getstate__(self):
return (self._max_size, self._current_size,
self._values_cache, self.hits,
self.misses)

def __setstate__(self, state):
(self._store, self._max_size, self._current_size,
self._values_cache, self.hits,
self.misses) = state
self._mutex = Lock()

def __len__(self):
return len(self._keys())

def __iter__(self):
return self.keys()

def __contains__(self, key):
with self._mutex:
return key in self._keys()

def clear(self):
self.invalidate()

def keys(self):
with self._mutex:
return iter(self._keys())

def _keys(self):
return self._values_cache.keys()

def _pop_value(self):
# remove the first value from the cache, as this will be the least recently
# used value
_, v = self._values_cache.popitem(last=False)
return v

def _accommodate_value(self, value_size):
if self._max_size is None:
return
# ensure there is enough space in the cache for a new value
while self._current_size + value_size > self._max_size:
v = self._pop_value()
self._current_size -= buffer_size(v)

def _cache_value(self, key, value):
# cache a value
value_size = buffer_size(value)
# check size of the value against max size, as if the value itself exceeds max
# size then we are never going to cache it
if self._max_size is None or value_size <= self._max_size:
self._accommodate_value(value_size)
self._values_cache[key] = value
self._current_size += value_size

def invalidate(self):
"""Completely clear the cache."""
with self._mutex:
self._values_cache.clear()

def invalidate_values(self):
"""Clear the values cache."""
with self._mutex:
self._values_cache.clear()

def _invalidate_value(self, key):
if key in self._values_cache:
value = self._values_cache.pop(key)
self._current_size -= buffer_size(value)

def __getitem__(self, key):
try:
# try to obtain the value from the cache
with self._mutex:
value = self._values_cache[key]
# cache hit if no KeyError is raised
self.hits += 1
# treat the end as most recently used
OrderedDict_move_to_end(self._values_cache, key)

except KeyError:
# cache miss
with self._mutex:
self.misses += 1
raise KeyError

return value

def __setitem__(self, key, value):
with self._mutex:
self._invalidate_value(key)
self._cache_value(key, value)

def __delitem__(self, key):
with self._mutex:
self._invalidate_value(key)
47 changes: 46 additions & 1 deletion zarr/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from zarr.storage import (DirectoryStore, init_array, init_group, NestedDirectoryStore,
DBMStore, LMDBStore, atexit_rmtree, atexit_rmglob,
LRUStoreCache)
LRUStoreCache, ChunkCache)
from zarr.core import Array
from zarr.errors import PermissionError
from zarr.compat import PY2, text_type, binary_type
Expand Down Expand Up @@ -1698,3 +1698,48 @@ def create_array(read_only=False, **kwargs):
init_array(store, **kwargs)
return Array(store, read_only=read_only, cache_metadata=cache_metadata,
cache_attrs=cache_attrs)


class TestArrayWithChunkCache(TestArray):

@staticmethod
def create_array(read_only=False, **kwargs):
store = dict()
kwargs.setdefault('compressor', Zlib(level=1))
cache_metadata = kwargs.pop('cache_metadata', True)
cache_attrs = kwargs.pop('cache_attrs', True)
init_array(store, **kwargs)
return Array(store, read_only=read_only, cache_metadata=cache_metadata,
cache_attrs=cache_attrs, chunk_cache=ChunkCache(max_size=None))

@staticmethod
def create_array_with_cache(read_only=False, **kwargs):
store = dict()
kwargs.setdefault('compressor', Zlib(level=1))
cache_metadata = kwargs.pop('cache_metadata', True)
cache_attrs = kwargs.pop('cache_attrs', True)
init_array(store, **kwargs)
cache = ChunkCache(max_size=None)
return Array(store, read_only=read_only, cache_metadata=cache_metadata,
cache_attrs=cache_attrs, chunk_cache=cache), cache

def test_hit_miss(self):
a = np.arange(100).reshape((10, 10))
z, cache = self.create_array_with_cache(shape=a.shape, chunks=(10,1), dtype=a.dtype)

# test write cache
z[:] = a
assert cache.misses == 0 and cache.hits == 0
_ = z[:]
assert cache.misses == 0 and cache.hits == 10

cache.clear()
cache.misses = 0
cache.hits = 0

# test read cache
assert cache.misses == 0 and cache.hits == 0
_ = z[:]
assert cache.misses == 10 and cache.hits == 0
_ = z[:]
assert cache.misses == 10 and cache.hits == 10
Loading