diff --git a/java/src/main/java/ai/rapids/cudf/nvcomp/BatchedCompressor.java b/java/src/main/java/ai/rapids/cudf/nvcomp/BatchedCompressor.java new file mode 100644 index 00000000000..72dfcdb3cb5 --- /dev/null +++ b/java/src/main/java/ai/rapids/cudf/nvcomp/BatchedCompressor.java @@ -0,0 +1,335 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.rapids.cudf.nvcomp; + +import ai.rapids.cudf.BaseDeviceMemoryBuffer; +import ai.rapids.cudf.CloseableArray; +import ai.rapids.cudf.Cuda; +import ai.rapids.cudf.DeviceMemoryBuffer; +import ai.rapids.cudf.HostMemoryBuffer; +import ai.rapids.cudf.MemoryBuffer; +import ai.rapids.cudf.NvtxColor; +import ai.rapids.cudf.NvtxRange; + +/** Multi-buffer compressor */ +public abstract class BatchedCompressor { + + static final long MAX_CHUNK_SIZE = 16777216; // 16MiB in bytes + // each chunk has a 64-bit integer value as metadata containing the compressed size + static final long METADATA_BYTES_PER_CHUNK = 8; + + private final long chunkSize; + private final long maxIntermediateBufferSize; + private final long maxOutputChunkSize; + + /** + * Construct a batched compressor instance + * @param chunkSize maximum amount of uncompressed data to compress as a single chunk. + * Inputs larger than this will be compressed in multiple chunks. + * @param maxIntermediateBufferSize desired maximum size of intermediate device + * buffers used during compression. + */ + public BatchedCompressor(long chunkSize, long maxOutputChunkSize, + long maxIntermediateBufferSize) { + validateChunkSize(chunkSize); + assert maxOutputChunkSize < Integer.MAX_VALUE; + this.chunkSize = chunkSize; + this.maxOutputChunkSize = maxOutputChunkSize; + this.maxIntermediateBufferSize = Math.max(maxOutputChunkSize, maxIntermediateBufferSize); + } + + /** + * Compress a batch of buffers. The input buffers will be closed. + * @param origInputs buffers to compress + * @param stream CUDA stream to use + * @return compressed buffers corresponding to the input buffers + */ + public DeviceMemoryBuffer[] compress(BaseDeviceMemoryBuffer[] origInputs, Cuda.Stream stream) { + try (CloseableArray inputs = CloseableArray.wrap(origInputs)) { + if (chunkSize <= 0) { + throw new IllegalArgumentException("Illegal chunk size: " + chunkSize); + } + final int numInputs = inputs.size(); + if (numInputs == 0) { + return new DeviceMemoryBuffer[0]; + } + + // Each buffer is broken up into chunkSize chunks for compression. Calculate how many + // chunks are needed for each input buffer. + int[] chunksPerInput = new int[numInputs]; + int numChunks = 0; + for (int i = 0; i < numInputs; i++) { + BaseDeviceMemoryBuffer buffer = inputs.get(i); + int numBufferChunks = getNumChunksInBuffer(buffer); + chunksPerInput[i] = numBufferChunks; + numChunks += numBufferChunks; + } + + // Allocate buffers for each chunk and generate parallel lists of chunk source addresses, + // chunk destination addresses, and sizes. + try (CloseableArray compressedBuffers = + allocCompressedBuffers(numChunks, stream); + DeviceMemoryBuffer compressedChunkSizes = + DeviceMemoryBuffer.allocate(numChunks * 8L, stream)) { + long[] inputChunkAddrs = new long[numChunks]; + long[] inputChunkSizes = new long[numChunks]; + long[] outputChunkAddrs = new long[numChunks]; + buildAddrsAndSizes(inputs, inputChunkAddrs, inputChunkSizes, compressedBuffers, + outputChunkAddrs); + + final long tempBufferSize = batchedCompressGetTempSize(numChunks, chunkSize); + try (DeviceMemoryBuffer addrsAndSizes = putAddrsAndSizesOnDevice(inputChunkAddrs, + inputChunkSizes, outputChunkAddrs, stream); + DeviceMemoryBuffer tempBuffer = + DeviceMemoryBuffer.allocate(tempBufferSize, stream)) { + final long devOutputAddrsPtr = addrsAndSizes.getAddress() + numChunks * 8L; + final long devInputSizesPtr = devOutputAddrsPtr + numChunks * 8L; + batchedCompressAsync(addrsAndSizes.getAddress(), devInputSizesPtr, chunkSize, + numChunks, tempBuffer.getAddress(), tempBufferSize, devOutputAddrsPtr, + compressedChunkSizes.getAddress(), stream.getStream()); + } + + // Synchronously copy the resulting compressed sizes per chunk. + long[] outputChunkSizes = getOutputChunkSizes(compressedChunkSizes, stream); + + // inputs are no longer needed at this point, so free them early + inputs.close(); + + // Combine compressed chunks into output buffers corresponding to each original input + return stitchOutput(chunksPerInput, compressedChunkSizes, outputChunkAddrs, + outputChunkSizes, stream); + } + } + } + + static void validateChunkSize(long chunkSize) { + if (chunkSize <= 0 || chunkSize > MAX_CHUNK_SIZE) { + throw new IllegalArgumentException("Invalid chunk size: " + chunkSize + + " Max chunk size is: " + MAX_CHUNK_SIZE + " bytes"); + } + } + + private static long ceilingDivide(long x, long y) { + return (x + y - 1) / y; + } + + private int getNumChunksInBuffer(MemoryBuffer buffer) { + return (int) ceilingDivide(buffer.getLength(), chunkSize); + } + + private CloseableArray allocCompressedBuffers(long numChunks, + Cuda.Stream stream) { + final long chunksPerBuffer = maxIntermediateBufferSize / maxOutputChunkSize; + final long numBuffers = ceilingDivide(numChunks, chunksPerBuffer); + if (numBuffers > Integer.MAX_VALUE) { + throw new IllegalStateException("Too many chunks"); + } + try (NvtxRange range = new NvtxRange("allocCompressedBuffers", NvtxColor.YELLOW)) { + CloseableArray buffers = CloseableArray.wrap( + new DeviceMemoryBuffer[(int) numBuffers]); + try { + // allocate all of the max-chunks intermediate compressed buffers + for (int i = 0; i < buffers.size() - 1; ++i) { + buffers.set(i, + DeviceMemoryBuffer.allocate(chunksPerBuffer * maxOutputChunkSize, stream)); + } + // allocate the tail intermediate compressed buffer that may be smaller than the others + buffers.set(buffers.size() - 1, DeviceMemoryBuffer.allocate( + (numChunks - chunksPerBuffer * (buffers.size() - 1)) * maxOutputChunkSize, stream)); + return buffers; + } catch (Exception e) { + buffers.close(e); + throw e; + } + } + } + + // Fill in the inputChunkAddrs, inputChunkSizes, and outputChunkAddrs arrays to point + // into the chunks in the input and output buffers. + private void buildAddrsAndSizes(CloseableArray inputs, + long[] inputChunkAddrs, long[] inputChunkSizes, + CloseableArray compressedBuffers, long[] outputChunkAddrs) { + // setup the input addresses and sizes + int chunkIdx = 0; + for (BaseDeviceMemoryBuffer input : inputs.getArray()) { + final int numChunksInBuffer = getNumChunksInBuffer(input); + for (int i = 0; i < numChunksInBuffer; i++) { + inputChunkAddrs[chunkIdx] = input.getAddress() + i * chunkSize; + inputChunkSizes[chunkIdx] = (i != numChunksInBuffer - 1) ? chunkSize + : (input.getLength() - (long) i * chunkSize); + ++chunkIdx; + } + } + assert chunkIdx == inputChunkAddrs.length; + assert chunkIdx == inputChunkSizes.length; + + // setup output addresses + chunkIdx = 0; + for (DeviceMemoryBuffer buffer : compressedBuffers.getArray()) { + assert buffer.getLength() % maxOutputChunkSize == 0; + long numChunksInBuffer = buffer.getLength() / maxOutputChunkSize; + long baseAddr = buffer.getAddress(); + for (int i = 0; i < numChunksInBuffer; i++) { + outputChunkAddrs[chunkIdx++] = baseAddr + i * maxOutputChunkSize; + } + } + assert chunkIdx == outputChunkAddrs.length; + } + + // Write input addresses, output addresses and sizes contiguously into a DeviceMemoryBuffer. + private DeviceMemoryBuffer putAddrsAndSizesOnDevice(long[] inputAddrs, long[] inputSizes, + long[] outputAddrs, Cuda.Stream stream) { + final long totalSize = inputAddrs.length * 8L * 3; // space for input, output, and size arrays + final long outputAddrsOffset = inputAddrs.length * 8L; + final long sizesOffset = outputAddrsOffset + inputAddrs.length * 8L; + try (NvtxRange range = new NvtxRange("putAddrsAndSizesOnDevice", NvtxColor.YELLOW)) { + try (HostMemoryBuffer hostbuf = HostMemoryBuffer.allocate(totalSize); + DeviceMemoryBuffer result = DeviceMemoryBuffer.allocate(totalSize)) { + hostbuf.setLongs(0, inputAddrs, 0, inputAddrs.length); + hostbuf.setLongs(outputAddrsOffset, outputAddrs, 0, outputAddrs.length); + for (int i = 0; i < inputSizes.length; i++) { + hostbuf.setLong(sizesOffset + i * 8L, inputSizes[i]); + } + result.copyFromHostBuffer(hostbuf, stream); + result.incRefCount(); + return result; + } + } + } + + // Synchronously copy the resulting compressed sizes from device memory to host memory. + private long[] getOutputChunkSizes(BaseDeviceMemoryBuffer devChunkSizes, Cuda.Stream stream) { + try (NvtxRange range = new NvtxRange("getOutputChunkSizes", NvtxColor.YELLOW)) { + try (HostMemoryBuffer hostbuf = HostMemoryBuffer.allocate(devChunkSizes.getLength())) { + hostbuf.copyFromDeviceBuffer(devChunkSizes, stream); + int numChunks = (int) (devChunkSizes.getLength() / 8); + long[] result = new long[numChunks]; + for (int i = 0; i < numChunks; i++) { + long size = hostbuf.getLong(i * 8L); + assert size < Integer.MAX_VALUE : "output size is too big"; + result[i] = size; + } + return result; + } + } + } + + // Stitch together the individual chunks into the result buffers. + // Each result buffer has metadata at the beginning, followed by compressed chunks. + // This is done by building up parallel lists of source addr, dest addr and size and + // then calling multiBufferCopyAsync() + private DeviceMemoryBuffer[] stitchOutput(int[] chunksPerInput, + DeviceMemoryBuffer compressedChunkSizes, long[] outputChunkAddrs, + long[] outputChunkSizes, Cuda.Stream stream) { + try (NvtxRange range = new NvtxRange("stitchOutput", NvtxColor.YELLOW)) { + final int numOutputs = chunksPerInput.length; + final long chunkSizesAddr = compressedChunkSizes.getAddress(); + long[] outputBufferSizes = calcOutputBufferSizes(chunksPerInput, outputChunkSizes); + try (CloseableArray outputs = + CloseableArray.wrap(new DeviceMemoryBuffer[numOutputs])) { + // Each chunk needs to be copied, and each output needs a copy of the + // compressed chunk size vector representing the metadata. + final int totalBuffersToCopy = numOutputs + outputChunkAddrs.length; + long[] destAddrs = new long[totalBuffersToCopy]; + long[] srcAddrs = new long[totalBuffersToCopy]; + long[] sizes = new long[totalBuffersToCopy]; + int copyBufferIdx = 0; + int chunkIdx = 0; + for (int outputIdx = 0; outputIdx < numOutputs; outputIdx++) { + DeviceMemoryBuffer outputBuffer = + DeviceMemoryBuffer.allocate(outputBufferSizes[outputIdx]); + outputs.set(outputIdx, outputBuffer); + final long outputBufferAddr = outputBuffer.getAddress(); + final long numChunks = chunksPerInput[outputIdx]; + final long metadataSize = numChunks * METADATA_BYTES_PER_CHUNK; + + // setup a copy of the metadata at the front of the output buffer + srcAddrs[copyBufferIdx] = chunkSizesAddr + chunkIdx * 8; + destAddrs[copyBufferIdx] = outputBufferAddr; + sizes[copyBufferIdx] = metadataSize; + ++copyBufferIdx; + + // setup copies of the compressed chunks for this output buffer + long nextChunkAddr = outputBufferAddr + metadataSize; + for (int i = 0; i < numChunks; ++i) { + srcAddrs[copyBufferIdx] = outputChunkAddrs[chunkIdx]; + destAddrs[copyBufferIdx] = nextChunkAddr; + final long chunkSize = outputChunkSizes[chunkIdx]; + sizes[copyBufferIdx] = chunkSize; + copyBufferIdx++; + chunkIdx++; + nextChunkAddr += chunkSize; + } + } + assert copyBufferIdx == totalBuffersToCopy; + assert chunkIdx == outputChunkAddrs.length; + assert chunkIdx == outputChunkSizes.length; + + Cuda.multiBufferCopyAsync(destAddrs, srcAddrs, sizes, stream); + return outputs.release(); + } + } + } + + // Calculate the sizes for each output buffer (metadata plus size of compressed chunks) + private long[] calcOutputBufferSizes(int[] chunksPerInput, long[] outputChunkSizes) { + long[] sizes = new long[chunksPerInput.length]; + int chunkIdx = 0; + for (int i = 0; i < sizes.length; i++) { + final int chunksInBuffer = chunksPerInput[i]; + final int chunkEndIdx = chunkIdx + chunksInBuffer; + // metadata stored in front of compressed data + long bufferSize = METADATA_BYTES_PER_CHUNK * chunksInBuffer; + // add in the compressed chunk sizes to get the total size + while (chunkIdx < chunkEndIdx) { + bufferSize += outputChunkSizes[chunkIdx++]; + } + sizes[i] = bufferSize; + } + assert chunkIdx == outputChunkSizes.length; + return sizes; + } + + /** + * Get the temporary workspace size required to perform compression of an entire batch. + * @param batchSize number of chunks in the batch + * @param maxChunkSize maximum size of an uncompressed chunk in bytes + * @return The size of required temporary workspace in bytes to compress the batch. + */ + protected abstract long batchedCompressGetTempSize(long batchSize, long maxChunkSize); + + /** + * Asynchronously compress a batch of buffers. Note that compressedSizesOutPtr must + * point to pinned memory for this operation to be asynchronous. + * @param devInPtrs device address of uncompressed buffer addresses vector + * @param devInSizes device address of uncompressed buffer sizes vector + * @param chunkSize maximum size of an uncompressed chunk in bytes + * @param batchSize number of chunks in the batch + * @param tempPtr device address of the temporary workspace buffer + * @param tempSize size of the temporary workspace buffer in bytes + * @param devOutPtrs device address of output buffer addresses vector + * @param compressedSizesOutPtr device address where to write the sizes of the + * compressed data written to the corresponding + * output buffers. Must point to a buffer with + * at least 8 bytes of memory per output buffer + * in the batch. + * @param stream CUDA stream to use + */ + protected abstract void batchedCompressAsync(long devInPtrs, long devInSizes, long chunkSize, + long batchSize, long tempPtr, long tempSize, long devOutPtrs, long compressedSizesOutPtr, + long stream); +} diff --git a/java/src/main/java/ai/rapids/cudf/nvcomp/BatchedDecompressor.java b/java/src/main/java/ai/rapids/cudf/nvcomp/BatchedDecompressor.java new file mode 100644 index 00000000000..5543d2dcb64 --- /dev/null +++ b/java/src/main/java/ai/rapids/cudf/nvcomp/BatchedDecompressor.java @@ -0,0 +1,220 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.rapids.cudf.nvcomp; + +import ai.rapids.cudf.CloseableArray; +import ai.rapids.cudf.Cuda; +import ai.rapids.cudf.BaseDeviceMemoryBuffer; +import ai.rapids.cudf.DeviceMemoryBuffer; +import ai.rapids.cudf.HostMemoryBuffer; +import ai.rapids.cudf.NvtxColor; +import ai.rapids.cudf.NvtxRange; + +import java.util.Arrays; + +/** Decompressor that operates on multiple input buffers in a batch */ +public abstract class BatchedDecompressor { + + private final long chunkSize; + + /** + * Construct a batched decompressor instance + * @param chunkSize maximum uncompressed block size, must match value used + * during compression + */ + public BatchedDecompressor(long chunkSize) { + this.chunkSize = chunkSize; + } + + /** + * Asynchronously decompress a batch of buffers + * @param origInputs buffers to decompress, will be closed by this operation + * @param outputs output buffers that will contain the decompressed results, each must + * be sized to the exact decompressed size of the corresponding input + * @param stream CUDA stream to use + */ + public void decompressAsync(BaseDeviceMemoryBuffer[] origInputs, + BaseDeviceMemoryBuffer[] outputs, Cuda.Stream stream) { + try (CloseableArray inputs = + CloseableArray.wrap(Arrays.copyOf(origInputs, origInputs.length))) { + BatchedCompressor.validateChunkSize(chunkSize); + if (origInputs.length != outputs.length) { + throw new IllegalArgumentException("number of inputs must match number of outputs"); + } + final int numInputs = inputs.size(); + if (numInputs == 0) { + return; + } + + int[] chunksPerInput = new int[numInputs]; + long totalChunks = 0; + for (int i = 0; i < numInputs; i++) { + // use output size to determine number of chunks in the input, as the output buffer + // must be exactly sized to the uncompressed data + BaseDeviceMemoryBuffer buffer = outputs[i]; + int numBufferChunks = getNumChunksInBuffer(chunkSize, buffer); + chunksPerInput[i] = numBufferChunks; + totalChunks += numBufferChunks; + } + + final long tempBufferSize = batchedDecompressGetTempSize(totalChunks, chunkSize); + try (DeviceMemoryBuffer devAddrsSizes = buildAddrsSizesBuffer(chunkSize, totalChunks, + inputs.getArray(), chunksPerInput, outputs, stream); + DeviceMemoryBuffer devTemp = DeviceMemoryBuffer.allocate(tempBufferSize)) { + // buffer containing addresses and sizes contains four vectors of longs in this order: + // - compressed chunk input addresses + // - chunk output buffer addresses + // - compressed chunk sizes + // - uncompressed chunk sizes + final long inputAddrsPtr = devAddrsSizes.getAddress(); + final long outputAddrsPtr = inputAddrsPtr + totalChunks * 8; + final long inputSizesPtr = outputAddrsPtr + totalChunks * 8; + final long outputSizesPtr = inputSizesPtr + totalChunks * 8; + batchedDecompressAsync(inputAddrsPtr, inputSizesPtr, outputSizesPtr, totalChunks, + devTemp.getAddress(), devTemp.getLength(), outputAddrsPtr, stream.getStream()); + } + } + } + + private static int getNumChunksInBuffer(long chunkSize, BaseDeviceMemoryBuffer buffer) { + return (int) ((buffer.getLength() + chunkSize - 1) / chunkSize); + } + + /** + * Build a device memory buffer containing four vectors of longs in the following order: + *
    + *
  • compressed chunk input addresses
  • + *
  • uncompressed chunk output addresses
  • + *
  • compressed chunk sizes
  • + *
  • uncompressed chunk sizes
  • + *
+ * Each vector contains as many longs as the number of chunks being decompressed + * @param chunkSize maximum uncompressed size of a chunk + * @param totalChunks total number of chunks to be decompressed + * @param inputs device buffers containing the compressed data + * @param chunksPerInput number of compressed chunks per input buffer + * @param outputs device buffers that will hold the uncompressed output + * @param stream CUDA stream to use + * @return device buffer containing address and size vectors + */ + private static DeviceMemoryBuffer buildAddrsSizesBuffer(long chunkSize, long totalChunks, + BaseDeviceMemoryBuffer[] inputs, int[] chunksPerInput, BaseDeviceMemoryBuffer[] outputs, + Cuda.Stream stream) { + final long totalBufferSize = totalChunks * 8L * 4L; + try (NvtxRange range = new NvtxRange("buildAddrSizesBuffer", NvtxColor.YELLOW)) { + try (HostMemoryBuffer metadata = fetchMetadata(totalChunks, inputs, chunksPerInput, stream); + HostMemoryBuffer hostAddrsSizes = HostMemoryBuffer.allocate(totalBufferSize); + DeviceMemoryBuffer devAddrsSizes = DeviceMemoryBuffer.allocate(totalBufferSize)) { + // Build four long vectors in the AddrsSizes buffer: + // - compressed input address (one per chunk) + // - uncompressed output address (one per chunk) + // - compressed input size (one per chunk) + // - uncompressed input size (one per chunk) + final long srcAddrsOffset = 0; + final long destAddrsOffset = srcAddrsOffset + totalChunks * 8L; + final long srcSizesOffset = destAddrsOffset + totalChunks * 8L; + final long destSizesOffset = srcSizesOffset + totalChunks * 8L; + long chunkIdx = 0; + for (int inputIdx = 0; inputIdx < inputs.length; inputIdx++) { + final BaseDeviceMemoryBuffer input = inputs[inputIdx]; + final BaseDeviceMemoryBuffer output = outputs[inputIdx]; + final int numChunksInInput = chunksPerInput[inputIdx]; + long srcAddr = input.getAddress() + + BatchedCompressor.METADATA_BYTES_PER_CHUNK * numChunksInInput; + long destAddr = output.getAddress(); + final long chunkIdxEnd = chunkIdx + numChunksInInput; + while (chunkIdx < chunkIdxEnd) { + final long srcChunkSize = metadata.getLong(chunkIdx * 8); + final long destChunkSize = (chunkIdx < chunkIdxEnd - 1) ? chunkSize + : output.getAddress() + output.getLength() - destAddr; + hostAddrsSizes.setLong(srcAddrsOffset + chunkIdx * 8, srcAddr); + hostAddrsSizes.setLong(destAddrsOffset + chunkIdx * 8, destAddr); + hostAddrsSizes.setLong(srcSizesOffset + chunkIdx * 8, srcChunkSize); + hostAddrsSizes.setLong(destSizesOffset + chunkIdx * 8, destChunkSize); + srcAddr += srcChunkSize; + destAddr += destChunkSize; + ++chunkIdx; + } + } + devAddrsSizes.copyFromHostBuffer(hostAddrsSizes, stream); + devAddrsSizes.incRefCount(); + return devAddrsSizes; + } + } + } + + /** + * Fetch the metadata at the front of each input in a single, contiguous host buffer. + * @param totalChunks total number of compressed chunks + * @param inputs buffers containing the compressed data + * @param chunksPerInput number of compressed chunks for the corresponding input + * @param stream CUDA stream to use + * @return host buffer containing all of the metadata + */ + private static HostMemoryBuffer fetchMetadata(long totalChunks, BaseDeviceMemoryBuffer[] inputs, + int[] chunksPerInput, Cuda.Stream stream) { + try (NvtxRange range = new NvtxRange("fetchMetadata", NvtxColor.PURPLE)) { + // one long per chunk containing the compressed size + final long totalMetadataSize = totalChunks * BatchedCompressor.METADATA_BYTES_PER_CHUNK; + // Build corresponding vectors of destination addresses, source addresses and sizes. + long[] destAddrs = new long[inputs.length]; + long[] srcAddrs = new long[inputs.length]; + long[] sizes = new long[inputs.length]; + try (HostMemoryBuffer hostMetadata = HostMemoryBuffer.allocate(totalMetadataSize); + DeviceMemoryBuffer devMetadata = DeviceMemoryBuffer.allocate(totalMetadataSize)) { + long destCopyAddr = devMetadata.getAddress(); + for (int inputIdx = 0; inputIdx < inputs.length; inputIdx++) { + final BaseDeviceMemoryBuffer input = inputs[inputIdx]; + final long copySize = + chunksPerInput[inputIdx] * BatchedCompressor.METADATA_BYTES_PER_CHUNK; + destAddrs[inputIdx] = destCopyAddr; + srcAddrs[inputIdx] = input.getAddress(); + sizes[inputIdx] = copySize; + destCopyAddr += copySize; + } + Cuda.multiBufferCopyAsync(destAddrs, srcAddrs, sizes, stream); + hostMetadata.copyFromDeviceBuffer(devMetadata, stream); + hostMetadata.incRefCount(); + return hostMetadata; + } + } + } + + /** + * Computes the temporary storage size in bytes needed to decompress a compressed batch. + * @param numChunks number of chunks in the batch + * @param maxUncompressedChunkBytes maximum uncompressed size of any chunk in bytes + * @return number of temporary storage bytes needed to decompress the batch + */ + protected abstract long batchedDecompressGetTempSize(long numChunks, + long maxUncompressedChunkBytes); + + /** + * Asynchronously decompress a batch of compressed data buffers. + * @param devInPtrs device address of compressed input buffer addresses vector + * @param devInSizes device address of compressed input buffer sizes vector + * @param devOutSizes device address of uncompressed buffer sizes vector + * @param batchSize number of buffers in the batch + * @param tempPtr device address of the temporary decompression space + * @param tempSize size of the temporary decompression space in bytes + * @param devOutPtrs device address of uncompressed output buffer addresses vector + * @param stream CUDA stream to use + */ + protected abstract void batchedDecompressAsync(long devInPtrs, long devInSizes, + long devOutSizes, long batchSize, long tempPtr, long tempSize, long devOutPtrs, + long stream); +} diff --git a/java/src/main/java/ai/rapids/cudf/nvcomp/BatchedLZ4Compressor.java b/java/src/main/java/ai/rapids/cudf/nvcomp/BatchedLZ4Compressor.java index 1aa7e5e11a0..58c0e7ee169 100644 --- a/java/src/main/java/ai/rapids/cudf/nvcomp/BatchedLZ4Compressor.java +++ b/java/src/main/java/ai/rapids/cudf/nvcomp/BatchedLZ4Compressor.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,305 +16,31 @@ package ai.rapids.cudf.nvcomp; -import ai.rapids.cudf.BaseDeviceMemoryBuffer; -import ai.rapids.cudf.CloseableArray; -import ai.rapids.cudf.Cuda; -import ai.rapids.cudf.DefaultHostMemoryAllocator; -import ai.rapids.cudf.DeviceMemoryBuffer; -import ai.rapids.cudf.HostMemoryAllocator; -import ai.rapids.cudf.HostMemoryBuffer; -import ai.rapids.cudf.MemoryBuffer; -import ai.rapids.cudf.NvtxColor; -import ai.rapids.cudf.NvtxRange; - /** Multi-buffer LZ4 compressor */ -public class BatchedLZ4Compressor { - private static final HostMemoryAllocator hostMemoryAllocator = DefaultHostMemoryAllocator.get(); - - static final long MAX_CHUNK_SIZE = 16777216; // in bytes - // each chunk has a 64-bit integer value as metadata containing the compressed size - static final long METADATA_BYTES_PER_CHUNK = 8; - - private final long chunkSize; - private final long targetIntermediateBufferSize; - private final long maxOutputChunkSize; +public class BatchedLZ4Compressor extends BatchedCompressor { /** * Construct a batched LZ4 compressor instance - * @param chunkSize maximum amount of uncompressed data to compress as a single chunk. Inputs - * larger than this will be compressed in multiple chunks. - * @param targetIntermediateBufferSize desired maximum size of intermediate device buffers - * used during compression. - */ - public BatchedLZ4Compressor(long chunkSize, long targetIntermediateBufferSize) { - validateChunkSize(chunkSize); - this.chunkSize = chunkSize; - this.maxOutputChunkSize = NvcompJni.batchedLZ4CompressGetMaxOutputChunkSize(chunkSize); - assert maxOutputChunkSize < Integer.MAX_VALUE; - this.targetIntermediateBufferSize = Math.max(targetIntermediateBufferSize, maxOutputChunkSize); - } - - /** - * Compress a batch of buffers with LZ4. The input buffers will be closed. - * @param origInputs buffers to compress - * @param stream CUDA stream to use - * @return compressed buffers corresponding to the input buffers + * @param chunkSize maximum amount of uncompressed data to compress as a single chunk. + * Inputs larger than this will be compressed in multiple chunks. + * @param maxIntermediateBufferSize desired maximum size of intermediate device buffers + * used during compression. */ - public DeviceMemoryBuffer[] compress(BaseDeviceMemoryBuffer[] origInputs, Cuda.Stream stream) { - try (CloseableArray inputs = CloseableArray.wrap(origInputs)) { - if (chunkSize <= 0) { - throw new IllegalArgumentException("Illegal chunk size: " + chunkSize); - } - final int numInputs = inputs.size(); - if (numInputs == 0) { - return new DeviceMemoryBuffer[0]; - } - - // Each buffer is broken up into chunkSize chunks for compression. Calculate how many - // chunks are needed for each input buffer. - int[] chunksPerInput = new int[numInputs]; - int numChunks = 0; - for (int i = 0; i < numInputs; i++) { - BaseDeviceMemoryBuffer buffer = inputs.get(i); - int numBufferChunks = getNumChunksInBuffer(buffer); - chunksPerInput[i] = numBufferChunks; - numChunks += numBufferChunks; - } - - // Allocate buffers for each chunk and generate parallel lists of chunk source addresses, - // chunk destination addresses, and sizes. - try (CloseableArray compressedBuffers = - allocCompressedBuffers(numChunks, stream); - DeviceMemoryBuffer compressedChunkSizes = - DeviceMemoryBuffer.allocate(numChunks * 8L, stream)) { - long[] inputChunkAddrs = new long[numChunks]; - long[] inputChunkSizes = new long[numChunks]; - long[] outputChunkAddrs = new long[numChunks]; - buildAddrsAndSizes(inputs, inputChunkAddrs, inputChunkSizes, - compressedBuffers, outputChunkAddrs); - - long[] outputChunkSizes; - final long tempBufferSize = NvcompJni.batchedLZ4CompressGetTempSize(numChunks, chunkSize); - try (DeviceMemoryBuffer addrsAndSizes = - putAddrsAndSizesOnDevice(inputChunkAddrs, inputChunkSizes, outputChunkAddrs, stream); - DeviceMemoryBuffer tempBuffer = DeviceMemoryBuffer.allocate(tempBufferSize, stream)) { - final long devOutputAddrsPtr = addrsAndSizes.getAddress() + numChunks * 8L; - final long devInputSizesPtr = devOutputAddrsPtr + numChunks * 8L; - NvcompJni.batchedLZ4CompressAsync( - addrsAndSizes.getAddress(), - devInputSizesPtr, - chunkSize, - numChunks, - tempBuffer.getAddress(), - tempBufferSize, - devOutputAddrsPtr, - compressedChunkSizes.getAddress(), - stream.getStream()); - } - - // Synchronously copy the resulting compressed sizes per chunk. - outputChunkSizes = getOutputChunkSizes(compressedChunkSizes, stream); - - // inputs are no longer needed at this point, so free them early - inputs.close(); - - // Combine compressed chunks into output buffers corresponding to each original input - return stitchOutput(chunksPerInput, compressedChunkSizes, outputChunkAddrs, - outputChunkSizes, stream); - } - } - } - - static void validateChunkSize(long chunkSize) { - if (chunkSize <= 0 || chunkSize > MAX_CHUNK_SIZE) { - throw new IllegalArgumentException("Invalid chunk size: " + chunkSize + " Max chunk size is: " - + MAX_CHUNK_SIZE + " bytes"); - } + public BatchedLZ4Compressor(long chunkSize, long maxIntermediateBufferSize) { + super(chunkSize, NvcompJni.batchedLZ4CompressGetMaxOutputChunkSize(chunkSize), + maxIntermediateBufferSize); } - private static long ceilingDivide(long x, long y) { - return (x + y - 1) / y; - } - - private int getNumChunksInBuffer(MemoryBuffer buffer) { - return (int) ceilingDivide(buffer.getLength(), chunkSize); - } - - private CloseableArray allocCompressedBuffers(long numChunks, - Cuda.Stream stream) { - final long chunksPerBuffer = targetIntermediateBufferSize / maxOutputChunkSize; - final long numBuffers = ceilingDivide(numChunks, chunksPerBuffer); - if (numBuffers > Integer.MAX_VALUE) { - throw new IllegalStateException("Too many chunks"); - } - try (NvtxRange range = new NvtxRange("allocCompressedBuffers", NvtxColor.YELLOW)) { - CloseableArray buffers = CloseableArray.wrap( - new DeviceMemoryBuffer[(int) numBuffers]); - try { - // allocate all of the max-chunks intermediate compressed buffers - for (int i = 0; i < buffers.size() - 1; ++i) { - buffers.set(i, DeviceMemoryBuffer.allocate(chunksPerBuffer * maxOutputChunkSize, stream)); - } - // allocate the tail intermediate compressed buffer that may be smaller than the others - buffers.set(buffers.size() - 1, DeviceMemoryBuffer.allocate( - (numChunks - chunksPerBuffer * (buffers.size() - 1)) * maxOutputChunkSize, stream)); - return buffers; - } catch (Exception e) { - buffers.close(e); - throw e; - } - } - } - - // Fill in the inputChunkAddrs, inputChunkSizes, and outputChunkAddrs arrays to point - // into the chunks in the input and output buffers. - private void buildAddrsAndSizes(CloseableArray inputs, - long[] inputChunkAddrs, - long[] inputChunkSizes, - CloseableArray compressedBuffers, - long[] outputChunkAddrs) { - // setup the input addresses and sizes - int chunkIdx = 0; - for (BaseDeviceMemoryBuffer input : inputs.getArray()) { - final int numChunksInBuffer = getNumChunksInBuffer(input); - for (int i = 0; i < numChunksInBuffer; i++) { - inputChunkAddrs[chunkIdx] = input.getAddress() + i * chunkSize; - inputChunkSizes[chunkIdx] = (i != numChunksInBuffer - 1) ? chunkSize - : (input.getLength() - (long) i * chunkSize); - ++chunkIdx; - } - } - assert chunkIdx == inputChunkAddrs.length; - assert chunkIdx == inputChunkSizes.length; - - // setup output addresses - chunkIdx = 0; - for (DeviceMemoryBuffer buffer : compressedBuffers.getArray()) { - assert buffer.getLength() % maxOutputChunkSize == 0; - long numChunksInBuffer = buffer.getLength() / maxOutputChunkSize; - long baseAddr = buffer.getAddress(); - for (int i = 0; i < numChunksInBuffer; i++) { - outputChunkAddrs[chunkIdx++] = baseAddr + i * maxOutputChunkSize; - } - } - assert chunkIdx == outputChunkAddrs.length; - } - - // Write input addresses, output addresses and sizes contiguously into a DeviceMemoryBuffer. - private DeviceMemoryBuffer putAddrsAndSizesOnDevice(long[] inputAddrs, - long[] inputSizes, - long[] outputAddrs, - Cuda.Stream stream) { - final long totalSize = inputAddrs.length * 8L * 3; // space for input, output, and size arrays - final long outputAddrsOffset = inputAddrs.length * 8L; - final long sizesOffset = outputAddrsOffset + inputAddrs.length * 8L; - try (NvtxRange range = new NvtxRange("putAddrsAndSizesOnDevice", NvtxColor.YELLOW)) { - try (HostMemoryBuffer hostbuf = hostMemoryAllocator.allocate(totalSize); - DeviceMemoryBuffer result = DeviceMemoryBuffer.allocate(totalSize)) { - hostbuf.setLongs(0, inputAddrs, 0, inputAddrs.length); - hostbuf.setLongs(outputAddrsOffset, outputAddrs, 0, outputAddrs.length); - for (int i = 0; i < inputSizes.length; i++) { - hostbuf.setLong(sizesOffset + i * 8L, inputSizes[i]); - } - result.copyFromHostBuffer(hostbuf, stream); - result.incRefCount(); - return result; - } - } - } - - // Synchronously copy the resulting compressed sizes from device memory to host memory. - private long[] getOutputChunkSizes(BaseDeviceMemoryBuffer devChunkSizes, Cuda.Stream stream) { - try (NvtxRange range = new NvtxRange("getOutputChunkSizes", NvtxColor.YELLOW)) { - try (HostMemoryBuffer hostbuf = hostMemoryAllocator.allocate(devChunkSizes.getLength())) { - hostbuf.copyFromDeviceBuffer(devChunkSizes, stream); - int numChunks = (int) (devChunkSizes.getLength() / 8); - long[] result = new long[numChunks]; - for (int i = 0; i < numChunks; i++) { - long size = hostbuf.getLong(i * 8L); - assert size < Integer.MAX_VALUE : "output size is too big"; - result[i] = size; - } - return result; - } - } - } - - // Stitch together the individual chunks into the result buffers. - // Each result buffer has metadata at the beginning, followed by compressed chunks. - // This is done by building up parallel lists of source addr, dest addr and size and - // then calling multiBufferCopyAsync() - private DeviceMemoryBuffer[] stitchOutput(int[] chunksPerInput, - DeviceMemoryBuffer compressedChunkSizes, - long[] outputChunkAddrs, - long[] outputChunkSizes, - Cuda.Stream stream) { - try (NvtxRange range = new NvtxRange("stitchOutput", NvtxColor.YELLOW)) { - final int numOutputs = chunksPerInput.length; - final long chunkSizesAddr = compressedChunkSizes.getAddress(); - long[] outputBufferSizes = calcOutputBufferSizes(chunksPerInput, outputChunkSizes); - try (CloseableArray outputs = - CloseableArray.wrap(new DeviceMemoryBuffer[numOutputs])) { - // Each chunk needs to be copied, and each output needs a copy of the - // compressed chunk size vector representing the metadata. - final int totalBuffersToCopy = numOutputs + outputChunkAddrs.length; - long[] destAddrs = new long[totalBuffersToCopy]; - long[] srcAddrs = new long[totalBuffersToCopy]; - long[] sizes = new long[totalBuffersToCopy]; - int copyBufferIdx = 0; - int chunkIdx = 0; - for (int outputIdx = 0; outputIdx < numOutputs; outputIdx++) { - DeviceMemoryBuffer outputBuffer = DeviceMemoryBuffer.allocate(outputBufferSizes[outputIdx]); - final long outputBufferAddr = outputBuffer.getAddress(); - outputs.set(outputIdx, outputBuffer); - final long numChunks = chunksPerInput[outputIdx]; - final long metadataSize = numChunks * METADATA_BYTES_PER_CHUNK; - - // setup a copy of the metadata at the front of the output buffer - srcAddrs[copyBufferIdx] = chunkSizesAddr + chunkIdx * 8; - destAddrs[copyBufferIdx] = outputBufferAddr; - sizes[copyBufferIdx] = metadataSize; - ++copyBufferIdx; - - // setup copies of the compressed chunks for this output buffer - long nextChunkAddr = outputBufferAddr + metadataSize; - for (int i = 0; i < numChunks; ++i) { - srcAddrs[copyBufferIdx] = outputChunkAddrs[chunkIdx]; - destAddrs[copyBufferIdx] = nextChunkAddr; - final long chunkSize = outputChunkSizes[chunkIdx]; - sizes[copyBufferIdx] = chunkSize; - copyBufferIdx++; - chunkIdx++; - nextChunkAddr += chunkSize; - } - } - assert copyBufferIdx == totalBuffersToCopy; - assert chunkIdx == outputChunkAddrs.length; - assert chunkIdx == outputChunkSizes.length; - - Cuda.multiBufferCopyAsync(destAddrs, srcAddrs, sizes, stream); - return outputs.release(); - } - } + @Override + protected long batchedCompressGetTempSize(long batchSize, long maxChunkSize) { + return NvcompJni.batchedLZ4CompressGetTempSize(batchSize, maxChunkSize); } - // Calculate the list of sizes for each output buffer (metadata plus size of compressed chunks) - private long[] calcOutputBufferSizes(int[] chunksPerInput, - long[] outputChunkSizes) { - long[] sizes = new long[chunksPerInput.length]; - int chunkIdx = 0; - for (int i = 0; i < sizes.length; i++) { - final int chunksInBuffer = chunksPerInput[i]; - final int chunkEndIdx = chunkIdx + chunksInBuffer; - // metadata stored in front of compressed data - long bufferSize = METADATA_BYTES_PER_CHUNK * chunksInBuffer; - // add in the compressed chunk sizes to get the total size - while (chunkIdx < chunkEndIdx) { - bufferSize += outputChunkSizes[chunkIdx++]; - } - sizes[i] = bufferSize; - } - assert chunkIdx == outputChunkSizes.length; - return sizes; + @Override + protected void batchedCompressAsync(long devInPtrs, long devInSizes, long chunkSize, + long batchSize, long tempPtr, long tempSize, long devOutPtrs, + long compressedSizesOutPtr, long stream) { + NvcompJni.batchedLZ4CompressAsync(devInPtrs, devInSizes, chunkSize, batchSize, + tempPtr, tempSize, devOutPtrs, compressedSizesOutPtr, stream); } } diff --git a/java/src/main/java/ai/rapids/cudf/nvcomp/BatchedLZ4Decompressor.java b/java/src/main/java/ai/rapids/cudf/nvcomp/BatchedLZ4Decompressor.java index 40ad4d5e9ed..d78d537ea13 100644 --- a/java/src/main/java/ai/rapids/cudf/nvcomp/BatchedLZ4Decompressor.java +++ b/java/src/main/java/ai/rapids/cudf/nvcomp/BatchedLZ4Decompressor.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,18 +16,15 @@ package ai.rapids.cudf.nvcomp; -import ai.rapids.cudf.CloseableArray; import ai.rapids.cudf.Cuda; import ai.rapids.cudf.BaseDeviceMemoryBuffer; -import ai.rapids.cudf.DeviceMemoryBuffer; -import ai.rapids.cudf.HostMemoryBuffer; -import ai.rapids.cudf.NvtxColor; -import ai.rapids.cudf.NvtxRange; - -import java.util.Arrays; /** LZ4 decompressor that operates on multiple input buffers in a batch */ -public class BatchedLZ4Decompressor { +public class BatchedLZ4Decompressor extends BatchedDecompressor { + public BatchedLZ4Decompressor(long chunkSize) { + super(chunkSize); + } + /** * Asynchronously decompress a batch of buffers * @param chunkSize maximum uncompressed block size, must match value used during compression @@ -35,165 +32,24 @@ public class BatchedLZ4Decompressor { * @param outputs output buffers that will contain the compressed results, each must be sized * to the exact decompressed size of the corresponding input * @param stream CUDA stream to use + * + * Deprecated: Use the non-static version in the parent class instead. */ - public static void decompressAsync(long chunkSize, - BaseDeviceMemoryBuffer[] origInputs, - BaseDeviceMemoryBuffer[] outputs, - Cuda.Stream stream) { - try (CloseableArray inputs = - CloseableArray.wrap(Arrays.copyOf(origInputs, origInputs.length))) { - BatchedLZ4Compressor.validateChunkSize(chunkSize); - if (origInputs.length != outputs.length) { - throw new IllegalArgumentException("number of inputs must match number of outputs"); - } - final int numInputs = inputs.size(); - if (numInputs == 0) { - return; - } - - int[] chunksPerInput = new int[numInputs]; - long totalChunks = 0; - for (int i = 0; i < numInputs; i++) { - // use output size to determine number of chunks in the input, as the output buffer - // must be exactly sized to the uncompressed data - BaseDeviceMemoryBuffer buffer = outputs[i]; - int numBufferChunks = getNumChunksInBuffer(chunkSize, buffer); - chunksPerInput[i] = numBufferChunks; - totalChunks += numBufferChunks; - } - - final long tempBufferSize = NvcompJni.batchedLZ4DecompressGetTempSize(totalChunks, chunkSize); - try (DeviceMemoryBuffer devAddrsSizes = - buildAddrsSizesBuffer(chunkSize, totalChunks, inputs.getArray(), chunksPerInput, - outputs, stream); - DeviceMemoryBuffer devTemp = DeviceMemoryBuffer.allocate(tempBufferSize)) { - // buffer containing addresses and sizes contains four vectors of longs in this order: - // - compressed chunk input addresses - // - chunk output buffer addresses - // - compressed chunk sizes - // - uncompressed chunk sizes - final long inputAddrsPtr = devAddrsSizes.getAddress(); - final long outputAddrsPtr = inputAddrsPtr + totalChunks * 8; - final long inputSizesPtr = outputAddrsPtr + totalChunks * 8; - final long outputSizesPtr = inputSizesPtr + totalChunks * 8; - NvcompJni.batchedLZ4DecompressAsync( - inputAddrsPtr, - inputSizesPtr, - outputSizesPtr, - totalChunks, - devTemp.getAddress(), - devTemp.getLength(), - outputAddrsPtr, - stream.getStream()); - } - } + public static void decompressAsync(long chunkSize, BaseDeviceMemoryBuffer[] origInputs, + BaseDeviceMemoryBuffer[] outputs, Cuda.Stream stream) { + new BatchedLZ4Decompressor(chunkSize).decompressAsync(origInputs, outputs, stream); } - private static int getNumChunksInBuffer(long chunkSize, BaseDeviceMemoryBuffer buffer) { - return (int) ((buffer.getLength() + chunkSize - 1) / chunkSize); + @Override + protected long batchedDecompressGetTempSize(long numChunks, long maxUncompressedChunkBytes) { + return NvcompJni.batchedLZ4DecompressGetTempSize(numChunks, maxUncompressedChunkBytes); } - /** - * Build a device memory buffer containing four vectors of longs in the following order: - *
    - *
  • compressed chunk input addresses
  • - *
  • uncompressed chunk output addresses
  • - *
  • compressed chunk sizes
  • - *
  • uncompressed chunk sizes
  • - *
- * Each vector contains as many longs as the number of chunks being decompressed - * @param chunkSize maximum uncompressed size of a chunk - * @param totalChunks total number of chunks to be decompressed - * @param inputs device buffers containing the compressed data - * @param chunksPerInput number of compressed chunks per input buffer - * @param outputs device buffers that will hold the uncompressed output - * @param stream CUDA stream to use - * @return device buffer containing address and size vectors - */ - private static DeviceMemoryBuffer buildAddrsSizesBuffer(long chunkSize, - long totalChunks, - BaseDeviceMemoryBuffer[] inputs, - int[] chunksPerInput, - BaseDeviceMemoryBuffer[] outputs, - Cuda.Stream stream) { - final long totalBufferSize = totalChunks * 8L * 4L; - try (NvtxRange range = new NvtxRange("buildAddrSizesBuffer", NvtxColor.YELLOW)) { - try (HostMemoryBuffer metadata = fetchMetadata(totalChunks, inputs, chunksPerInput, stream); - HostMemoryBuffer hostAddrsSizes = HostMemoryBuffer.allocate(totalBufferSize); - DeviceMemoryBuffer devAddrsSizes = DeviceMemoryBuffer.allocate(totalBufferSize)) { - // Build four long vectors in the AddrsSizes buffer: - // - compressed input address (one per chunk) - // - uncompressed output address (one per chunk) - // - compressed input size (one per chunk) - // - uncompressed input size (one per chunk) - final long srcAddrsOffset = 0; - final long destAddrsOffset = srcAddrsOffset + totalChunks * 8L; - final long srcSizesOffset = destAddrsOffset + totalChunks * 8L; - final long destSizesOffset = srcSizesOffset + totalChunks * 8L; - long chunkIdx = 0; - for (int inputIdx = 0; inputIdx < inputs.length; inputIdx++) { - final BaseDeviceMemoryBuffer input = inputs[inputIdx]; - final BaseDeviceMemoryBuffer output = outputs[inputIdx]; - final int numChunksInInput = chunksPerInput[inputIdx]; - long srcAddr = input.getAddress() + - BatchedLZ4Compressor.METADATA_BYTES_PER_CHUNK * numChunksInInput; - long destAddr = output.getAddress(); - final long chunkIdxEnd = chunkIdx + numChunksInInput; - while (chunkIdx < chunkIdxEnd) { - final long srcChunkSize = metadata.getLong(chunkIdx * 8); - final long destChunkSize = (chunkIdx < chunkIdxEnd - 1) ? chunkSize - : output.getAddress() + output.getLength() - destAddr; - hostAddrsSizes.setLong(srcAddrsOffset + chunkIdx * 8, srcAddr); - hostAddrsSizes.setLong(destAddrsOffset + chunkIdx * 8, destAddr); - hostAddrsSizes.setLong(srcSizesOffset + chunkIdx * 8, srcChunkSize); - hostAddrsSizes.setLong(destSizesOffset + chunkIdx * 8, destChunkSize); - srcAddr += srcChunkSize; - destAddr += destChunkSize; - ++chunkIdx; - } - } - devAddrsSizes.copyFromHostBuffer(hostAddrsSizes, stream); - devAddrsSizes.incRefCount(); - return devAddrsSizes; - } - } + @Override + protected void batchedDecompressAsync(long devInPtrs, long devInSizes, long devOutSizes, + long batchSize, long tempPtr, long tempSize, long devOutPtrs, long stream) { + NvcompJni.batchedLZ4DecompressAsync(devInPtrs, devInSizes, devOutSizes, batchSize, tempPtr, + tempSize, devOutPtrs, stream); } - /** - * Fetch the metadata at the front of each input in a single, contiguous host buffer. - * @param totalChunks total number of compressed chunks - * @param inputs buffers containing the compressed data - * @param chunksPerInput number of compressed chunks for the corresponding input - * @param stream CUDA stream to use - * @return host buffer containing all of the metadata - */ - private static HostMemoryBuffer fetchMetadata(long totalChunks, - BaseDeviceMemoryBuffer[] inputs, - int[] chunksPerInput, - Cuda.Stream stream) { - try (NvtxRange range = new NvtxRange("fetchMetadata", NvtxColor.PURPLE)) { - // one long per chunk containing the compressed size - final long totalMetadataSize = totalChunks * BatchedLZ4Compressor.METADATA_BYTES_PER_CHUNK; - // Build corresponding vectors of destination addresses, source addresses and sizes. - long[] destAddrs = new long[inputs.length]; - long[] srcAddrs = new long[inputs.length]; - long[] sizes = new long[inputs.length]; - try (HostMemoryBuffer hostMetadata = HostMemoryBuffer.allocate(totalMetadataSize); - DeviceMemoryBuffer devMetadata = DeviceMemoryBuffer.allocate(totalMetadataSize)) { - long destCopyAddr = devMetadata.getAddress(); - for (int inputIdx = 0; inputIdx < inputs.length; inputIdx++) { - final BaseDeviceMemoryBuffer input = inputs[inputIdx]; - final long copySize = chunksPerInput[inputIdx] * BatchedLZ4Compressor.METADATA_BYTES_PER_CHUNK; - destAddrs[inputIdx] = destCopyAddr; - srcAddrs[inputIdx] = input.getAddress(); - sizes[inputIdx] = copySize; - destCopyAddr += copySize; - } - Cuda.multiBufferCopyAsync(destAddrs, srcAddrs, sizes, stream); - hostMetadata.copyFromDeviceBuffer(devMetadata, stream); - hostMetadata.incRefCount(); - return hostMetadata; - } - } - } } diff --git a/java/src/main/java/ai/rapids/cudf/nvcomp/BatchedZstdCompressor.java b/java/src/main/java/ai/rapids/cudf/nvcomp/BatchedZstdCompressor.java new file mode 100644 index 00000000000..0532b4aa86d --- /dev/null +++ b/java/src/main/java/ai/rapids/cudf/nvcomp/BatchedZstdCompressor.java @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.rapids.cudf.nvcomp; + +/** Multi-buffer ZSTD compressor */ +public class BatchedZstdCompressor extends BatchedCompressor { + /** + * Construct a batched ZSTD compressor instance + * @param chunkSize maximum amount of uncompressed data to compress as a single chunk. + * Inputs larger than this will be compressed in multiple chunks. + * @param maxIntermediateBufferSize desired maximum size of intermediate device buffers + * used during compression. + */ + public BatchedZstdCompressor(long chunkSize, long maxIntermediateBufferSize) { + super(chunkSize, NvcompJni.batchedZstdCompressGetMaxOutputChunkSize(chunkSize), + maxIntermediateBufferSize); + } + + @Override + protected long batchedCompressGetTempSize(long batchSize, long maxChunkSize) { + return NvcompJni.batchedZstdCompressGetTempSize(batchSize, maxChunkSize); + } + + @Override + protected void batchedCompressAsync(long devInPtrs, long devInSizes, long chunkSize, + long batchSize, long tempPtr, long tempSize, long devOutPtrs, + long compressedSizesOutPtr, long stream) { + NvcompJni.batchedZstdCompressAsync(devInPtrs, devInSizes, chunkSize, batchSize, + tempPtr, tempSize, devOutPtrs, compressedSizesOutPtr, stream); + } +} diff --git a/java/src/main/java/ai/rapids/cudf/nvcomp/BatchedZstdDecompressor.java b/java/src/main/java/ai/rapids/cudf/nvcomp/BatchedZstdDecompressor.java new file mode 100644 index 00000000000..ba11a236834 --- /dev/null +++ b/java/src/main/java/ai/rapids/cudf/nvcomp/BatchedZstdDecompressor.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.rapids.cudf.nvcomp; + +/** ZSTD decompressor that operates on multiple input buffers in a batch */ +public class BatchedZstdDecompressor extends BatchedDecompressor { + public BatchedZstdDecompressor(long chunkSize) { + super(chunkSize); + } + + @Override + protected long batchedDecompressGetTempSize(long numChunks, long maxUncompressedChunkBytes) { + return NvcompJni.batchedZstdDecompressGetTempSize(numChunks, maxUncompressedChunkBytes); + } + + @Override + protected void batchedDecompressAsync(long devInPtrs, long devInSizes, long devOutSizes, + long batchSize, long tempPtr, long tempSize, long devOutPtrs, long stream) { + NvcompJni.batchedZstdDecompressAsync(devInPtrs, devInSizes, devOutSizes, batchSize, tempPtr, + tempSize, devOutPtrs, stream); + } + +} diff --git a/java/src/main/java/ai/rapids/cudf/nvcomp/NvcompJni.java b/java/src/main/java/ai/rapids/cudf/nvcomp/NvcompJni.java index 57094008c08..1a21629a208 100644 --- a/java/src/main/java/ai/rapids/cudf/nvcomp/NvcompJni.java +++ b/java/src/main/java/ai/rapids/cudf/nvcomp/NvcompJni.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2022, NVIDIA CORPORATION. + * Copyright (c) 2020-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,6 +24,7 @@ class NvcompJni { NativeDepsLoader.loadNativeDeps(); } + // For lz4 /** * Get the temporary workspace size required to perform compression of entire LZ4 batch. * @param batchSize number of chunks in the batch @@ -114,4 +115,97 @@ static native void batchedLZ4GetDecompressSizeAsync( long devOutSizes, long batchSize, long stream); + + // For zstd + /** + * Get the temporary workspace size required to perform compression of entire zstd batch. + * @param batchSize number of chunks in the batch + * @param maxChunkSize maximum size of an uncompressed chunk in bytes + * @return The size of required temporary workspace in bytes to compress the batch. + */ + static native long batchedZstdCompressGetTempSize(long batchSize, long maxChunkSize); + + /** + * Get the maximum size any chunk could compress to in a ZSTD batch. This is the minimum + * amount of output memory to allocate per chunk when batch compressing. + * @param maxChunkSize maximum size of an uncompressed chunk size in bytes + * @return maximum compressed output size of a chunk + */ + static native long batchedZstdCompressGetMaxOutputChunkSize(long maxChunkSize); + + /** + * Asynchronously compress a batch of buffers with ZSTD. Note that + * compressedSizesOutPtr must point to pinned memory for this operation + * to be asynchronous. + * @param devInPtrs device address of uncompressed buffer addresses vector + * @param devInSizes device address of uncompressed buffer sizes vector + * @param chunkSize maximum size of an uncompressed chunk in bytes + * @param batchSize number of chunks in the batch + * @param tempPtr device address of the temporary workspace buffer + * @param tempSize size of the temporary workspace buffer in bytes + * @param devOutPtrs device address of output buffer addresses vector + * @param compressedSizesOutPtr device address where to write the sizes of the + * compressed data written to the corresponding + * output buffers. Must point to a buffer with + * at least 8 bytes of memory per output buffer + * in the batch. + * @param stream CUDA stream to use + */ + static native void batchedZstdCompressAsync( + long devInPtrs, + long devInSizes, + long chunkSize, + long batchSize, + long tempPtr, + long tempSize, + long devOutPtrs, + long compressedSizesOutPtr, + long stream); + + /** + * Computes the temporary storage size in bytes needed to decompress a + * ZSTD-compressed batch. + * @param numChunks number of chunks in the batch + * @param maxUncompressedChunkBytes maximum uncompressed size of any chunk in bytes + * @return number of temporary storage bytes needed to decompress the batch + */ + static native long batchedZstdDecompressGetTempSize( + long numChunks, + long maxUncompressedChunkBytes); + + /** + * Asynchronously decompress a batch of ZSTD-compressed data buffers. + * @param devInPtrs device address of compressed input buffer addresses vector + * @param devInSizes device address of compressed input buffer sizes vector + * @param devOutSizes device address of uncompressed buffer sizes vector + * @param batchSize number of buffers in the batch + * @param tempPtr device address of the temporary decompression space + * @param tempSize size of the temporary decompression space in bytes + * @param devOutPtrs device address of uncompressed output buffer addresses vector + * @param stream CUDA stream to use + */ + static native void batchedZstdDecompressAsync( + long devInPtrs, + long devInSizes, + long devOutSizes, + long batchSize, + long tempPtr, + long tempSize, + long devOutPtrs, + long stream); + + /** + * Asynchronously calculates the decompressed size needed for each chunk. + * @param devInPtrs device address of compressed input buffer addresses vector + * @param devInSizes device address of compressed input buffer sizes vector + * @param devOutSizes device address of calculated decompress sizes vector + * @param batchSize number of buffers in the batch + * @param stream CUDA stream to use + */ + static native void batchedZstdGetDecompressSizeAsync( + long devInPtrs, + long devInSizes, + long devOutSizes, + long batchSize, + long stream); } diff --git a/java/src/main/native/src/NvcompJni.cpp b/java/src/main/native/src/NvcompJni.cpp index 47a24653549..8937438e922 100644 --- a/java/src/main/native/src/NvcompJni.cpp +++ b/java/src/main/native/src/NvcompJni.cpp @@ -20,6 +20,7 @@ #include #include +#include namespace { @@ -57,6 +58,7 @@ void check_nvcomp_status(JNIEnv* env, nvcompStatus_t status) extern "C" { +// methods for lz4 JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_nvcomp_NvcompJni_batchedLZ4CompressGetTempSize( JNIEnv* env, jclass, jlong j_batch_size, jlong j_max_chunk_size) { @@ -211,4 +213,158 @@ Java_ai_rapids_cudf_nvcomp_NvcompJni_batchedLZ4GetDecompressSizeAsync(JNIEnv* en CATCH_STD(env, ); } +// methods for zstd +JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_nvcomp_NvcompJni_batchedZstdCompressGetTempSize( + JNIEnv* env, jclass, jlong j_batch_size, jlong j_max_chunk_size) +{ + try { + cudf::jni::auto_set_device(env); + auto batch_size = static_cast(j_batch_size); + auto max_chunk_size = static_cast(j_max_chunk_size); + std::size_t temp_size = 0; + auto status = nvcompBatchedZstdCompressGetTempSize( + batch_size, max_chunk_size, nvcompBatchedZstdDefaultOpts, &temp_size); + check_nvcomp_status(env, status); + return static_cast(temp_size); + } + CATCH_STD(env, 0); +} + +JNIEXPORT jlong JNICALL +Java_ai_rapids_cudf_nvcomp_NvcompJni_batchedZstdCompressGetMaxOutputChunkSize( + JNIEnv* env, jclass, jlong j_max_chunk_size) +{ + try { + cudf::jni::auto_set_device(env); + auto max_chunk_size = static_cast(j_max_chunk_size); + std::size_t max_output_size = 0; + auto status = nvcompBatchedZstdCompressGetMaxOutputChunkSize( + max_chunk_size, nvcompBatchedZstdDefaultOpts, &max_output_size); + check_nvcomp_status(env, status); + return static_cast(max_output_size); + } + CATCH_STD(env, 0); +} + +JNIEXPORT void JNICALL +Java_ai_rapids_cudf_nvcomp_NvcompJni_batchedZstdCompressAsync(JNIEnv* env, + jclass, + jlong j_in_ptrs, + jlong j_in_sizes, + jlong j_chunk_size, + jlong j_batch_size, + jlong j_temp_ptr, + jlong j_temp_size, + jlong j_out_ptrs, + jlong j_compressed_sizes_out_ptr, + jlong j_stream) +{ + try { + cudf::jni::auto_set_device(env); + auto in_ptrs = reinterpret_cast(j_in_ptrs); + auto in_sizes = reinterpret_cast(j_in_sizes); + auto chunk_size = static_cast(j_chunk_size); + auto batch_size = static_cast(j_batch_size); + auto temp_ptr = reinterpret_cast(j_temp_ptr); + auto temp_size = static_cast(j_temp_size); + auto out_ptrs = reinterpret_cast(j_out_ptrs); + auto compressed_out_sizes = reinterpret_cast(j_compressed_sizes_out_ptr); + auto stream = reinterpret_cast(j_stream); + auto status = nvcompBatchedZstdCompressAsync(in_ptrs, + in_sizes, + chunk_size, + batch_size, + temp_ptr, + temp_size, + out_ptrs, + compressed_out_sizes, + nvcompBatchedZstdDefaultOpts, + stream); + check_nvcomp_status(env, status); + } + CATCH_STD(env, ); +} + +JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_nvcomp_NvcompJni_batchedZstdDecompressGetTempSize( + JNIEnv* env, jclass, jlong j_batch_size, jlong j_chunk_size) +{ + try { + cudf::jni::auto_set_device(env); + auto batch_size = static_cast(j_batch_size); + auto chunk_size = static_cast(j_chunk_size); + std::size_t temp_size = 0; + auto status = nvcompBatchedZstdDecompressGetTempSize(batch_size, chunk_size, &temp_size); + check_nvcomp_status(env, status); + return static_cast(temp_size); + } + CATCH_STD(env, 0); +} + +JNIEXPORT void JNICALL +Java_ai_rapids_cudf_nvcomp_NvcompJni_batchedZstdDecompressAsync(JNIEnv* env, + jclass, + jlong j_in_ptrs, + jlong j_in_sizes, + jlong j_out_sizes, + jlong j_batch_size, + jlong j_temp_ptr, + jlong j_temp_size, + jlong j_out_ptrs, + jlong j_stream) +{ + try { + cudf::jni::auto_set_device(env); + auto compressed_ptrs = reinterpret_cast(j_in_ptrs); + auto compressed_sizes = reinterpret_cast(j_in_sizes); + auto uncompressed_sizes = reinterpret_cast(j_out_sizes); + auto batch_size = static_cast(j_batch_size); + auto temp_ptr = reinterpret_cast(j_temp_ptr); + auto temp_size = static_cast(j_temp_size); + auto uncompressed_ptrs = reinterpret_cast(j_out_ptrs); + auto stream = reinterpret_cast(j_stream); + auto uncompressed_statuses = rmm::device_uvector(batch_size, stream); + auto actual_uncompressed_sizes = rmm::device_uvector(batch_size, stream); + auto status = nvcompBatchedZstdDecompressAsync(compressed_ptrs, + compressed_sizes, + uncompressed_sizes, + actual_uncompressed_sizes.data(), + batch_size, + temp_ptr, + temp_size, + uncompressed_ptrs, + uncompressed_statuses.data(), + stream); + check_nvcomp_status(env, status); + if (!cudf::java::check_nvcomp_output_sizes( + uncompressed_sizes, actual_uncompressed_sizes.data(), batch_size, stream)) { + cudf::jni::throw_java_exception( + env, NVCOMP_ERROR_CLASS, "nvcomp decompress output size mismatch"); + } + } + CATCH_STD(env, ); +} + +JNIEXPORT void JNICALL +Java_ai_rapids_cudf_nvcomp_NvcompJni_batchedZstdGetDecompressSizeAsync(JNIEnv* env, + jclass, + jlong j_in_ptrs, + jlong j_in_sizes, + jlong j_out_sizes, + jlong j_batch_size, + jlong j_stream) +{ + try { + cudf::jni::auto_set_device(env); + auto compressed_ptrs = reinterpret_cast(j_in_ptrs); + auto compressed_sizes = reinterpret_cast(j_in_sizes); + auto uncompressed_sizes = reinterpret_cast(j_out_sizes); + auto batch_size = static_cast(j_batch_size); + auto stream = reinterpret_cast(j_stream); + auto status = nvcompBatchedZstdGetDecompressSizeAsync( + compressed_ptrs, compressed_sizes, uncompressed_sizes, batch_size, stream); + check_nvcomp_status(env, status); + } + CATCH_STD(env, ); +} + } // extern "C" diff --git a/java/src/test/java/ai/rapids/cudf/nvcomp/NvcompTest.java b/java/src/test/java/ai/rapids/cudf/nvcomp/NvcompTest.java index 66f4fe39109..4e8fc225257 100644 --- a/java/src/test/java/ai/rapids/cudf/nvcomp/NvcompTest.java +++ b/java/src/test/java/ai/rapids/cudf/nvcomp/NvcompTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2022, NVIDIA CORPORATION. + * Copyright (c) 2020-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,18 +23,29 @@ import org.slf4j.LoggerFactory; import java.util.Arrays; -import java.util.Optional; public class NvcompTest { private static final HostMemoryAllocator hostMemoryAllocator = DefaultHostMemoryAllocator.get(); private static final Logger log = LoggerFactory.getLogger(ColumnVector.class); + private final long chunkSize = 64 * 1024; + private final long targetIntermediteSize = Long.MAX_VALUE; + @Test void testBatchedLZ4RoundTripAsync() { + testBatchedRoundTripAsync(new BatchedLZ4Compressor(chunkSize, targetIntermediteSize), + new BatchedLZ4Decompressor(chunkSize)); + } + + @Test + void testBatchedZstdRoundTripAsync() { + testBatchedRoundTripAsync(new BatchedZstdCompressor(chunkSize, targetIntermediteSize), + new BatchedZstdDecompressor(chunkSize)); + } + + void testBatchedRoundTripAsync(BatchedCompressor comp, BatchedDecompressor decomp) { final Cuda.Stream stream = Cuda.DEFAULT_STREAM; - final long chunkSize = 64 * 1024; - final long targetIntermediteSize = Long.MAX_VALUE; final int maxElements = 1024 * 1024 + 1; final int numBuffers = 200; long[] data = new long[maxElements]; @@ -52,10 +63,8 @@ void testBatchedLZ4RoundTripAsync() { } // compress and decompress the buffers - BatchedLZ4Compressor compressor = new BatchedLZ4Compressor(chunkSize, targetIntermediteSize); - try (CloseableArray compressedBuffers = - CloseableArray.wrap(compressor.compress(originalBuffers.getArray(), stream)); + CloseableArray.wrap(comp.compress(originalBuffers.getArray(), stream)); CloseableArray uncompressedBuffers = CloseableArray.wrap(new DeviceMemoryBuffer[numBuffers])) { for (int i = 0; i < numBuffers; i++) { @@ -64,8 +73,8 @@ void testBatchedLZ4RoundTripAsync() { } // decompress takes ownership of the compressed buffers and will close them - BatchedLZ4Decompressor.decompressAsync(chunkSize, compressedBuffers.release(), - uncompressedBuffers.getArray(), stream); + decomp.decompressAsync(compressedBuffers.release(), uncompressedBuffers.getArray(), + stream); // check the decompressed results against the original for (int i = 0; i < numBuffers; ++i) {