Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance of nvCOMP batch codec. #293

Merged
merged 8 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 52 additions & 85 deletions python/kvikio/_lib/libnvcomp_ll.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,11 @@ from __future__ import annotations
from abc import ABC, abstractmethod
from enum import IntEnum

from libc.stdint cimport uint32_t, uint64_t, uintptr_t
from libc.stdint cimport uint32_t, uintptr_t

from kvikio._lib.nvcomp_ll_cxx_api cimport (
cudaMemcpyKind,
cudaStream_t,
nvcompStatus_t,
nvcompType_t,
)
from kvikio._lib.nvcomp_ll_cxx_api cimport cudaStream_t, nvcompStatus_t, nvcompType_t

import cupy
from cupy.cuda.runtime import memcpyAsync


class nvCompStatus(IntEnum):
Expand Down Expand Up @@ -143,34 +137,27 @@ class nvCompBatchAlgorithm(ABC):
The number of chunks to compress.
temp_buf: cp.ndarray
The temporary GPU workspace.
comp_chunks: np.ndarray[uintp]
comp_chunks: cp.ndarray[uintp]
(output) The list of pointers on the GPU, to the output location for each
compressed batch item.
comp_chunk_sizes: np.ndarray[uint64]
comp_chunk_sizes: cp.ndarray[uint64]
(output) The compressed size in bytes of each chunk.
stream: cp.cuda.Stream
CUDA stream.
"""

# nvCOMP requires comp_chunks pointers container and
# comp_chunk_sizes to be in GPU memory.
comp_chunks_d = cupy.array(comp_chunks, dtype=cupy.uintp)
comp_chunk_sizes_d = cupy.empty_like(comp_chunk_sizes)

err = self._compress(
uncomp_chunks,
uncomp_chunk_sizes,
max_uncomp_chunk_bytes,
batch_size,
temp_buf,
comp_chunks_d,
comp_chunk_sizes_d,
comp_chunks,
comp_chunk_sizes,
stream,
)
if err != nvcompStatus_t.nvcompSuccess:
raise RuntimeError(f"Compression failed, error: {nvCompStatus(err)!r}.")
# Copy resulting compressed chunk sizes back to the host buffer.
comp_chunk_sizes[:] = comp_chunk_sizes_d.get()

@abstractmethod
def _compress(
Expand Down Expand Up @@ -238,9 +225,9 @@ class nvCompBatchAlgorithm(ABC):

Parameters
----------
comp_chunks: np.ndarray[uintp]
comp_chunks: cp.ndarray[uintp]
The pointers on the GPU, to compressed batched items.
comp_chunk_sizes: np.ndarray[uint64]
comp_chunk_sizes: cp.ndarray[uint64]
The size in bytes of each compressed batch item.
stream: cp.cuda.Stream
CUDA stream.
Expand All @@ -256,23 +243,21 @@ class nvCompBatchAlgorithm(ABC):
batch_size = len(comp_chunks)

# nvCOMP requires all buffers to be in GPU memory.
comp_chunks_d = cupy.array(comp_chunks, dtype=cupy.uintp)
comp_chunk_sizes_d = cupy.array(comp_chunk_sizes, dtype=cupy.uint64)
uncomp_chunk_sizes_d = cupy.empty_like(comp_chunk_sizes_d)
uncomp_chunk_sizes = cupy.empty_like(comp_chunk_sizes)

err = self._get_decomp_size(
comp_chunks_d,
comp_chunk_sizes_d,
comp_chunks,
comp_chunk_sizes,
batch_size,
uncomp_chunk_sizes_d,
uncomp_chunk_sizes,
stream,
)
if err != nvcompStatus_t.nvcompSuccess:
raise RuntimeError(
f"Could not get decompress buffer size, error: {nvCompStatus(err)!r}."
)

return uncomp_chunk_sizes_d
return uncomp_chunk_sizes

@abstractmethod
def _get_decomp_size(
Expand Down Expand Up @@ -302,9 +287,9 @@ class nvCompBatchAlgorithm(ABC):

Parameters
----------
comp_chunks: np.ndarray[uintp]
comp_chunks: cp.ndarray[uintp]
The pointers on the GPU, to compressed batched items.
comp_chunk_sizes: np.ndarray[uint64]
comp_chunk_sizes: cp.ndarray[uint64]
The size in bytes of each compressed batch item.
batch_size: int
The number of chunks to decompress.
Expand All @@ -323,14 +308,9 @@ class nvCompBatchAlgorithm(ABC):
CUDA stream.
"""

# nvCOMP requires comp_chunks pointers container and
# comp_chunk_sizes to be in GPU memory.
comp_chunks_d = cupy.array(comp_chunks, dtype=cupy.uintp)
comp_chunk_sizes_d = cupy.array(comp_chunk_sizes, dtype=cupy.uint64)

err = self._decompress(
comp_chunks_d,
comp_chunk_sizes_d,
comp_chunks,
comp_chunk_sizes,
batch_size,
temp_buf,
uncomp_chunks,
Expand Down Expand Up @@ -407,6 +387,31 @@ class nvCompBatchAlgorithmLZ4(nvCompBatchAlgorithm):
self.options = nvcompBatchedLZ4Opts_t(data_type)
self.has_header = has_header

# Note on LZ4 header structure: numcodecs LZ4 codec prepends
# a 4-byte (uint32_t) header to each compressed chunk.
# The header stores the size of the original (uncompressed) data:
# https://github.com/zarr-developers/numcodecs/blob/cb155432e36536e17a2d054c8c24b7bf6f4a7347/numcodecs/lz4.pyx#L89
#
# The following CUDA kernels read / write chunk header by
# casting the chunk pointer to a pointer to unsigned int.

# CUDA kernel that copies uncompressed chunk size from the chunk header.
self._get_size_from_header_kernel = cupy.ElementwiseKernel(
"uint64 comp_chunk_ptr",
"uint64 uncomp_chunk_size",
"uncomp_chunk_size = *((unsigned int *)comp_chunk_ptr)",
wence- marked this conversation as resolved.
Show resolved Hide resolved
"get_size_from_header",
)

# CUDA kernel that copies uncompressed chunk size to the chunk header.
self._set_chunk_size_header_kernel = cupy.ElementwiseKernel(
"uint64 uncomp_chunk_size",
"uint64 comp_chunk_ptr",
"((unsigned int *)comp_chunk_ptr)[0] = (unsigned int)uncomp_chunk_size",
"set_chunk_size_header",
no_return=True,
)

def _get_comp_temp_size(
self,
size_t batch_size,
Expand Down Expand Up @@ -455,20 +460,9 @@ class nvCompBatchAlgorithmLZ4(nvCompBatchAlgorithm):
# 2. Update target pointers in comp_chunks to skip the header portion,
# which is not compressed.
#
# Get the base pointers to sizes.
psize = to_ptr(uncomp_chunk_sizes)
for i in range(batch_size):
# Copy the original data size to the header.
memcpyAsync(
<uintptr_t>comp_chunks[i],
psize,
self.HEADER_SIZE_BYTES,
cudaMemcpyKind.cudaMemcpyDeviceToDevice,
stream.ptr
)
psize += sizeof(uint64_t)
# Update chunk pointer to skip the header.
comp_chunks[i] += self.HEADER_SIZE_BYTES
self._set_chunk_size_header_kernel(uncomp_chunk_sizes, comp_chunks)
# Update chunk pointer to skip the header.
comp_chunks += self.HEADER_SIZE_BYTES

super().compress(
uncomp_chunks,
Expand All @@ -482,10 +476,9 @@ class nvCompBatchAlgorithmLZ4(nvCompBatchAlgorithm):
)

if self.has_header:
for i in range(batch_size):
# Update chunk pointer and size to include the header.
comp_chunks[i] -= self.HEADER_SIZE_BYTES
comp_chunk_sizes[i] += self.HEADER_SIZE_BYTES
# Update chunk pointer and size to include the header.
comp_chunks -= self.HEADER_SIZE_BYTES
comp_chunk_sizes += self.HEADER_SIZE_BYTES

def _compress(
self,
Expand Down Expand Up @@ -541,26 +534,7 @@ class nvCompBatchAlgorithmLZ4(nvCompBatchAlgorithm):
stream,
)

assert comp_chunks.shape == comp_chunk_sizes.shape
batch_size = len(comp_chunks)

# uncomp_chunk_sizes is uint32 array to match the type in LZ4 header.
uncomp_chunk_sizes = cupy.empty(batch_size, dtype=cupy.uint32)

psize = to_ptr(uncomp_chunk_sizes)
for i in range(batch_size):
# Get pointer to the header and copy the data.
memcpyAsync(
psize,
<uintptr_t>comp_chunks[i],
sizeof(uint32_t),
cudaMemcpyKind.cudaMemcpyDeviceToDevice,
stream.ptr
)
psize += sizeof(uint32_t)
stream.synchronize()

return uncomp_chunk_sizes.astype(cupy.uint64)
return self._get_size_from_header_kernel(comp_chunks)

def _get_decomp_size(
self,
Expand Down Expand Up @@ -591,10 +565,9 @@ class nvCompBatchAlgorithmLZ4(nvCompBatchAlgorithm):
stream,
):
if self.has_header:
for i in range(batch_size):
# Update chunk pointer and size to exclude the header.
comp_chunks[i] += self.HEADER_SIZE_BYTES
comp_chunk_sizes[i] -= self.HEADER_SIZE_BYTES
# Update chunk pointer and size to exclude the header.
comp_chunks += self.HEADER_SIZE_BYTES
comp_chunk_sizes -= self.HEADER_SIZE_BYTES

super().decompress(
comp_chunks,
Expand All @@ -608,12 +581,6 @@ class nvCompBatchAlgorithmLZ4(nvCompBatchAlgorithm):
stream,
)

if self.has_header:
for i in range(batch_size):
# Update chunk pointer and size to include the header.
comp_chunks[i] -= self.HEADER_SIZE_BYTES
comp_chunk_sizes[i] += self.HEADER_SIZE_BYTES

def _decompress(
self,
comp_chunks,
Expand Down
Loading