Skip to content

Commit

Permalink
add support for bitshuffle (#286)
Browse files Browse the repository at this point in the history
* add support for bitshuffle

* add bitshuffle to integ tests

* took out extra debug log

* fix flake8 error
  • Loading branch information
jreadey authored Nov 10, 2023
1 parent a6c2c26 commit 45bdd6f
Show file tree
Hide file tree
Showing 8 changed files with 310 additions and 104 deletions.
34 changes: 19 additions & 15 deletions hsds/util/dsetUtil.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
("H5Z_FILTER_SNAPPY", 32003, "snappy"),
("H5Z_FILTER_LZ4", 32004, "lz4"),
("H5Z_FILTER_LZ4HC", 32005, "lz4hc"),
("H5Z_FILTER_BITSHUFFLE", 32008, "bitshuffle"),
("H5Z_FILTER_ZSTD", 32015, "zstd"),
)

Expand Down Expand Up @@ -78,6 +79,7 @@ def getFilterItem(key):
"""
Return filter code, id, and name, based on an id, a name or a code.
"""

if key == "deflate":
key = "gzip" # use gzip as equivalent
for item in FILTER_DEFS:
Expand Down Expand Up @@ -123,14 +125,15 @@ def getCompressionFilter(dset_json):
def getShuffleFilter(dset_json):
"""Return shuffle filter, or None"""
filters = getFilters(dset_json)
FILTER_CLASSES = ("H5Z_FILTER_SHUFFLE", "H5Z_FILTER_BITSHUFFLE")
for filter in filters:
try:
if filter["class"] == "H5Z_FILTER_SHUFFLE":
log.debug(f"Shuffle filter is used: {filter}")
return filter
except KeyError:
if "class" not in filter:
log.warn(f"filter option: {filter} with no class key")
continue
filter_class = filter["class"]
if filter_class in FILTER_CLASSES:
return filter

log.debug("Shuffle filter not used")
return None

Expand All @@ -149,17 +152,21 @@ def getFilterOps(app, dset_json, item_size):
filter_ops = {}

shuffleFilter = getShuffleFilter(dset_json)
if shuffleFilter:
filter_ops["use_shuffle"] = True
if shuffleFilter and item_size != "H5T_VARIABLE":
shuffle_name = shuffleFilter["name"]
if shuffle_name == "shuffle":
filter_ops["shuffle"] = 1 # use regular shuffle
elif shuffle_name == "bitshuffle":
filter_ops["shuffle"] = 2 # use bitshuffle
else:
log.warn(f"unexpected shuffleFilter: {shuffle_name}")
filter_ops["shuffle"] = 0 # no shuffle
else:
filter_ops["shuffle"] = 0 # no shuffle

if compressionFilter:
if compressionFilter["class"] == "H5Z_FILTER_DEFLATE":
filter_ops["compressor"] = "zlib" # blosc compressor
if shuffleFilter:
filter_ops["use_shuffle"] = True
else:
# for HDF5-style compression, use shuffle only if it turned on
filter_ops["use_shuffle"] = False
else:
if "name" in compressionFilter:
filter_ops["compressor"] = compressionFilter["name"]
Expand All @@ -170,10 +177,7 @@ def getFilterOps(app, dset_json, item_size):
else:
filter_ops["level"] = int(compressionFilter["level"])

if filter_ops:
filter_ops["item_size"] = item_size
if item_size == "H5T_VARIABLE":
filter_ops["use_shuffle"] = False
log.debug(f"save filter ops: {filter_ops} for {dset_id}")
filter_map[dset_id] = filter_ops # save
return filter_ops
Expand Down
206 changes: 128 additions & 78 deletions hsds/util/storUtil.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
import time
import json
import zlib
import numpy as np
import numcodecs as codecs
from bitshuffle import bitshuffle, bitunshuffle
from aiohttp.web_exceptions import HTTPInternalServerError


from .. import hsds_logger as log
from .s3Client import S3Client

Expand All @@ -44,6 +45,9 @@ def FileClient(app):

from .. import config

BYTE_SHUFFLE = 1
BIT_SHUFFLE = 2


def getCompressors():
"""return available compressors"""
Expand All @@ -65,6 +69,7 @@ def getCompressors():
def getSupportedFilters(include_compressors=True):
"""return list of other supported filters"""
filters = [
"bitshuffle",
"shuffle",
"fletcher32",
]
Expand All @@ -85,18 +90,118 @@ def getBloscThreads():
return nthreads


def _shuffle(element_size, chunk):
shuffler = codecs.Shuffle(element_size)
arr = shuffler.encode(chunk)
def _shuffle(codec, data, item_size=4):
if codec == 1:
# byte shuffle, use numcodecs Shuffle
shuffler = codecs.Shuffle(item_size)
arr = shuffler.encode(data)
elif codec == 2:
# bit shuffle, use bitshuffle packge
if isinstance(data, bytes):
# bitshufle is expecting numpy array
data = np.frombuffer(data, dtype=np.dtype("uint8"))
arr = bitshuffle(data)
else:
log.error(f"Unexpected codec: {codec} for _shuffle")
raise ValueError()
return arr.tobytes()


def _unshuffle(element_size, chunk):
shuffler = codecs.Shuffle(element_size)
arr = shuffler.decode(chunk)
def _unshuffle(codec, data, item_size=4):
if codec == 1:
# byte shuffle, use numcodecs Shuffle
shuffler = codecs.Shuffle(item_size)
arr = shuffler.decode(data)
elif codec == 2:
# bit shuffle, use bitshuffle
if isinstance(data, bytes):
# bitshufle is expecting numpy array
data = np.frombuffer(data, dtype=np.dtype("uint8"))
arr = bitunshuffle(data)

return arr.tobytes()


def _uncompress(data, compressor=None, shuffle=0, item_size=4):
""" Uncompress the provided data using compessor and/or shuffle """
log.debug(f"_uncompress(compressor={compressor}, shuffle={shuffle})")
if compressor:
if compressor in ("gzip", "deflate"):
# blosc referes to this as zlib
compressor = "zlib"
# first check if this was compressed with blosc
# returns typesize, isshuffle, and memcopied
blosc_metainfo = codecs.blosc.cbuffer_metainfo(data)
if blosc_metainfo[0] > 0:
log.info(f"blosc compressed data for {len(data)} bytes")
try:
blosc = codecs.Blosc()
udata = blosc.decode(data)
log.info(f"blosc uncompressed to {len(udata)} bytes")
data = udata
if shuffle == BYTE_SHUFFLE:
shuffle = 0 # blosc will unshuffle the bytes for us
except Exception as e:
msg = f"got exception: {e} using blosc decompression"
log.error(msg)
raise HTTPInternalServerError()
elif compressor == "zlib":
# data may have been compressed without blosc,
# try using zlib directly
log.info(f"using zlib to decompress {len(data)} bytes")
try:
udata = zlib.decompress(data)
log.info(f"uncompressed to {len(udata)} bytes")
data = udata
except zlib.error as zlib_error:
log.info(f"zlib_err: {zlib_error}")
log.error("unable to uncompress data with zlib")
raise HTTPInternalServerError()
else:
msg = f"don't know how to decompress data in {compressor} "
log.error(msg)
raise HTTPInternalServerError()
if shuffle:
start_time = time.time()
data = _unshuffle(shuffle, data, item_size=item_size)
finish_time = time.time()
elapsed = finish_time - start_time
msg = f"unshuffled {len(data)} bytes, {(elapsed):.2f} elapsed"
log.debug(msg)

return data


def _compress(data, compressor=None, clevel=5, shuffle=0, item_size=4):
log.debug(f"_uncompress(compressor={compressor}, shuffle={shuffle})")
if shuffle == 2:
# bit shuffle the data before applying the compressor
log.debug("bitshuffling data")
data = _shuffle(shuffle, data, item_size=item_size)
shuffle = 0 # don't do any blosc shuffling

if compressor:
if compressor in ("gzip", "deflate"):
# blosc referes to this as zlib
compressor = "zlib"
cdata = None
# try with blosc compressor
try:
blosc = codecs.Blosc(cname=compressor, clevel=clevel, shuffle=shuffle)
cdata = blosc.encode(data)
msg = f"compressed from {len(data)} bytes to {len(cdata)} bytes "
msg += f"using filter: {blosc.cname} with level: {blosc.clevel}"
log.info(msg)
except Exception as e:
log.error(f"got exception using blosc encoding: {e}")
raise HTTPInternalServerError()

if cdata is not None:
data = cdata # used compress data

return data


def _getStorageClient(app):
"""get storage client s3 or azure blob"""

Expand Down Expand Up @@ -217,58 +322,6 @@ async def getStorJSONObj(app, key, bucket=None):
return json_dict


def _uncompress(data, compressor=None, shuffle=0):
""" Uncompress the provided data using compessor and/or shuffle """
if compressor:
# compressed chunk data...

# first check if this was compressed with blosc
# returns typesize, isshuffle, and memcopied
blosc_metainfo = codecs.blosc.cbuffer_metainfo(data)
if blosc_metainfo[0] > 0:
log.info(f"blosc compressed data for {len(data)} bytes")
try:
blosc = codecs.Blosc()
udata = blosc.decode(data)
log.info(f"uncompressed to {len(udata)} bytes")
data = udata
shuffle = 0 # blosc will unshuffle the bytes for us
except Exception as e:
msg = f"got exception: {e} using blosc decompression"
log.error(msg)
raise HTTPInternalServerError()
elif compressor == "zlib":
# data may have been compressed without blosc,
# try using zlib directly
log.info(f"using zlib to decompress {len(data)} bytes")
try:
udata = zlib.decompress(data)
log.info(f"uncompressed to {len(udata)} bytes")
data = udata
except zlib.error as zlib_error:
log.info(f"zlib_err: {zlib_error}")
log.error("unable to uncompress data with zlib")
raise HTTPInternalServerError()
else:
msg = f"don't know how to decompress data in {compressor} "
log.error(msg)
raise HTTPInternalServerError()

if shuffle > 0:
log.debug(f"shuffle is {shuffle}")
start_time = time.time()
unshuffled = _unshuffle(shuffle, data)
if unshuffled is not None:
log.debug(f"unshuffled to {len(unshuffled)} bytes")
data = unshuffled
finish_time = time.time()
elapsed = finish_time - start_time
msg = f"unshuffled {len(data)} bytes, {(elapsed):.2f} elapsed"
log.debug(msg)

return data


async def getStorBytes(app,
key,
filter_ops=None,
Expand All @@ -294,15 +347,20 @@ async def getStorBytes(app,
log.info(msg)

shuffle = 0
item_size = 4
compressor = None
if filter_ops:
log.debug(f"getStorBytes for {key} with filter_ops: {filter_ops}")
if "use_shuffle" in filter_ops and filter_ops["use_shuffle"]:
if filter_ops.get("shuffle") == "shuffle":
shuffle = filter_ops["item_size"]
log.debug("using shuffle filter")
elif filter_ops.get("shuffle") == "bitshuffle":
shuffle = 2
log.debug("using bitshuffle filter")
if "compressor" in filter_ops:
compressor = filter_ops["compressor"]
log.debug(f"using compressor: {compressor}")
item_size = filter_ops["item_size"]

kwargs = {"bucket": bucket, "key": key, "offset": offset, "length": length}

Expand Down Expand Up @@ -336,7 +394,8 @@ async def getStorBytes(app,
m = n + chunk_location.length
log.debug(f"getStorBytes - extracting chunk from data[{n}:{m}]")
h5_bytes = data[n:m]
h5_bytes = _uncompress(h5_bytes, compressor=compressor, shuffle=shuffle)
kwargs = {"compressor": compressor, "shuffle": shuffle, "item_size": item_size}
h5_bytes = _uncompress(h5_bytes, **kwargs)
if len(h5_bytes) != h5_size:
msg = f"expected chunk index: {chunk_location.index} to have size: "
msg += f"{h5_size} but got: {len(h5_bytes)}"
Expand All @@ -353,7 +412,7 @@ async def getStorBytes(app,
# chunk_bytes got updated, so just return None
return None
else:
data = _uncompress(data, compressor=compressor, shuffle=shuffle)
data = _uncompress(data, compressor=compressor, shuffle=shuffle, item_size=item_size)
return data


Expand All @@ -365,32 +424,23 @@ async def putStorBytes(app, key, data, filter_ops=None, bucket=None):
bucket = app["bucket_name"]
if key[0] == "/":
key = key[1:] # no leading slash
shuffle = -1 # auto-shuffle
shuffle = 0
clevel = 5
cname = None # compressor name
item_size = 4
if filter_ops:
if "compressor" in filter_ops:
cname = filter_ops["compressor"]
if "use_shuffle" in filter_ops and not filter_ops["use_shuffle"]:
shuffle = 0 # client indicates to turn off shuffling
if "shuffle" in filter_ops:
shuffle = filter_ops["shuffle"]
if "level" in filter_ops:
clevel = filter_ops["level"]
item_size = filter_ops["item_size"]
msg = f"putStorBytes({bucket}/{key}), {len(data)} bytes shuffle: {shuffle}"
msg += f" compressor: {cname} level: {clevel}"
msg += f" compressor: {cname} level: {clevel}, item_size: {item_size}"
log.info(msg)

if cname:
try:
blosc = codecs.Blosc(cname=cname, clevel=clevel, shuffle=shuffle)
cdata = blosc.encode(data)
# TBD: add cname in blosc constructor
msg = f"compressed from {len(data)} bytes to {len(cdata)} bytes "
msg += f"using filter: {blosc.cname} with level: {blosc.clevel}"
log.info(msg)
data = cdata
except Exception as e:
log.error(f"got exception using blosc encoding: {e}")
raise HTTPInternalServerError()
data = _compress(data, compressor=cname, clevel=clevel, shuffle=shuffle, item_size=item_size)

rsp = await client.put_object(key, data, bucket=bucket)

Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ dependencies = [
"aiohttp_cors",
"aiofiles",
"azure-storage-blob",
"bitshuffle",
"botocore",
"cryptography",
"numcodecs",
Expand Down
2 changes: 1 addition & 1 deletion testall.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

PYTHON_CMD = "python" # change to "python3" if "python" invokes python version 2.x

unit_tests = ('array_util_test', 'chunk_util_test', 'domain_util_test',
unit_tests = ('array_util_test', 'chunk_util_test', 'compression_test', 'domain_util_test',
'dset_util_test', 'hdf5_dtype_test', 'id_util_test', 'lru_cache_test',
'shuffle_test', 'rangeget_util_test')

Expand Down
Loading

0 comments on commit 45bdd6f

Please sign in to comment.